Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: handle geometry names in get_speed_positionfixes #419

Merged
merged 1 commit into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions tests/model/test_util.py
Original file line number Diff line number Diff line change
@@ -1,12 1,16 @@
import os
from functools import WRAPPER_ASSIGNMENTS
import pytest
import trackintel as ti

from geopandas import GeoDataFrame
import numpy as np
from pandas import Timestamp, Timedelta
import pytest
from geopandas.testing import assert_geodataframe_equal
from shapely.geometry import Point

from trackintel.model.util import _copy_docstring
import trackintel as ti
from trackintel.io.postgis import read_trips_postgis
from trackintel.model.util import _copy_docstring, get_speed_positionfixes


@pytest.fixture
Expand Down Expand Up @@ -35,29 39,29 @@ class TestSpeedPositionfixes:
def test_positionfixes_stable(self, load_positionfixes):
"""Test whether the positionfixes stay the same apart from the new speed column"""
pfs, _ = load_positionfixes
speed_pfs = ti.model.util.get_speed_positionfixes(pfs)
speed_pfs = get_speed_positionfixes(pfs)
assert_geodataframe_equal(pfs, speed_pfs.drop(columns=["speed"]))

def test_accessor(self, load_positionfixes):
"""Test whether the accessor yields the same output as the function"""
pfs, _ = load_positionfixes
speed_pfs_acc = pfs.as_positionfixes.get_speed()
speed_pfs_normal = ti.model.util.get_speed_positionfixes(pfs)
speed_pfs_normal = get_speed_positionfixes(pfs)
assert_geodataframe_equal(speed_pfs_acc, speed_pfs_normal)

def test_speed_correct(self, load_positionfixes):
"""Test whether the correct speed values are computed"""
pfs, correct_speeds = load_positionfixes
# assert first two are the same
speed_pfs = ti.model.util.get_speed_positionfixes(pfs)
speed_pfs = get_speed_positionfixes(pfs)
assert speed_pfs.loc[speed_pfs.index[0], "speed"] == speed_pfs.loc[speed_pfs.index[1], "speed"]
assert np.all(np.isclose(speed_pfs["speed"].values, correct_speeds, rtol=1e-06))

def test_one_speed(self, load_positionfixes):
"""Test for each individual speed whether is is correct"""
pfs, correct_speeds = load_positionfixes
# compute speeds
speed_pfs = ti.model.util.get_speed_positionfixes(pfs)
speed_pfs = get_speed_positionfixes(pfs)
computed_speeds = speed_pfs["speed"].values
# test for each row whether the speed is correct
for ind in range(1, len(correct_speeds)):
Expand All @@ -69,6 73,30 @@ def test_one_speed(self, load_positionfixes):
assert np.isclose(dist / time_diff, computed_speeds[ind], rtol=1e-06)
assert np.isclose(dist / time_diff, correct_speeds[ind], rtol=1e-06)

def test_geometry_name(self, load_positionfixes):
"""Test if the geometry name can be set freely."""
pfs, _ = load_positionfixes
pfs.rename(columns={"geom": "freely_chosen_geometry_name"}, inplace=True)
pfs.set_geometry("freely_chosen_geometry_name", inplace=True)
get_speed_positionfixes(pfs)

def test_planar_geometry(self):
"""Test function for geometry that is planar."""
start_time = Timestamp("2022-05-26 23:59:59")
second = Timedelta("1s")
p1 = Point(0.0, 0.0)
p2 = Point(1.0, 1.0) # distance of sqrt(2)
p3 = Point(4.0, 5.0) # distance of 5
d = [
{"tracked_at": start_time, "g": p1},
{"tracked_at": start_time second, "g": p2},
{"tracked_at": start_time 3 * second, "g": p3},
]
pfs = GeoDataFrame(d, geometry="g", crs="EPSG:2056")
pfs = get_speed_positionfixes(pfs)
correct_speed = np.array((np.sqrt(2), np.sqrt(2), 5 / 2))
assert np.all(np.isclose(pfs["speed"].to_numpy(), correct_speed, rtol=1e-6))


class TestPfsMeanSpeedTriplegs:
def test_triplegs_stable(self, example_triplegs):
Expand Down Expand Up @@ -98,7 126,7 @@ def test_one_speed_correct(self, example_triplegs):
# compute speed for one tripleg manually
test_tpl = tpls.index[0]
test_pfs = pfs[pfs["tripleg_id"] == test_tpl]
pfs_speed = ti.model.util.get_speed_positionfixes(test_pfs)
pfs_speed = get_speed_positionfixes(test_pfs)
test_tpl_speed = np.mean(pfs_speed["speed"].values[1:])
# compare to the one computed in the function
computed_tpls_speed = tpls_speed.loc[test_tpl]["speed"]
Expand Down
32 changes: 13 additions & 19 deletions trackintel/model/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 3,7 @@
import numpy as np
import pandas as pd
from trackintel.geogr.distances import calculate_haversine_length
from trackintel.geogr.point_distances import haversine_dist


def get_speed_positionfixes(positionfixes):
Expand All @@ -25,30 26,23 @@ def get_speed_positionfixes(positionfixes):
positionfix, the speed is set to the same value as for the second one.
"""
pfs = positionfixes.copy()
if_planar_crs = ti.geogr.distances.check_gdf_planar(pfs)

# get next location and time
pfs["prev_geom"] = pfs["geom"].shift(1)
pfs.loc[pfs.index[0], "prev_geom"] = pfs.loc[pfs.index[0], "geom"]
pfs["prev_tracked_at"] = pfs["tracked_at"].shift(1)
is_planar_crs = ti.geogr.distances.check_gdf_planar(pfs)

g = pfs.geometry
# get distance and time difference
if if_planar_crs:
dist_function = lambda point: point.geom.distance(point.prev_geom)
if is_planar_crs:
dist = g.distance(g.shift(1)).to_numpy()
else:
dist_function = lambda point: ti.geogr.point_distances.haversine_dist(
point.geom.x, point.geom.y, point.prev_geom.x, point.prev_geom.y
)[0]
pfs["dist"] = pfs.apply(dist_function, axis=1)
pfs["time_diff"] = pfs.apply(lambda x: x.tracked_at - x.prev_tracked_at, axis=1).dt.total_seconds()
x = g.x.to_numpy()
y = g.y.to_numpy()
dist = np.zeros(len(pfs), dtype=np.float64)
dist[1:] = haversine_dist(x[:-1], y[:-1], x[1:], y[1:])

time_delta = (pfs["tracked_at"] - pfs["tracked_at"].shift(1)).dt.total_seconds().to_numpy()
# compute speed (in m/s)
pfs["speed"] = pfs["dist"] / pfs["time_diff"]
# The first point speed is imputed
pfs.loc[pfs.index[0], "speed"] = pfs.iloc[1]["speed"]

pfs.drop(columns=["prev_geom", "prev_tracked_at", "dist", "time_diff"], inplace=True)

speed = dist / time_delta
speed[0] = speed[1] # The first point speed is imputed
pfs["speed"] = speed
return pfs


Expand Down