Skip to content
This repository has been archived by the owner on Oct 13, 2022. It is now read-only.

This tool generates emulated data stream based on the NYC Taxi & Limousine Commission’s open dataset expanded with additional routing information using the Google Maps Direction API and interpolated timestamps to simulate a real time scenario.

License

Notifications You must be signed in to change notification settings

GoogleCloudPlatform/nyc-taxirides-stream-feeder

Repository files navigation

PubSub Taxi Rides feeder program written in Go

The emulated data stream generated by this tool is based on the NYC Taxi & Limousine Commission’s open dataset expanded with additional routing information using the Google Maps Direction API and interpolated timestamps to simulate a real time scenario.

Codelab

You can find a Cloud Dataflow codelab using the same message format this feeder app is generating at https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon.

Public Pub/Sub Topic

There is a public Cloud Pub/Sub topic driven by this app.

Sample Data & Prerequisites

We generated one week worth of taxi rides data (from 2015) that you can find in the pubsub-public-billion-taxi-rides bucket with prefix 'json/yellow_tripdata_2015-01-ext...'

To see all available files and check the format you can do gsutil ls gs://pubsub-public-billion-taxi-rides/json

You need a Google Cloud Platform Project with Cloud Pub/Sub APIs enabled.

Create a Pub/Sub Topic to stream the taxi rides to. In the docs we use realtime-feed for the real time streaming.

Running the program locally

If you run the feeder locally it uses your gcloud auth context. Make sure to set the right user and project. You might need to do gcloud auth login after you switched to the user you want to use.

When using the Google Cloud SDK gcloud tool with a normal user account, Pub/Sub operations are limited to a rate suitable for manual operations. You can activate a service account described in the GCloud Docs

The program uses default credentials from gcloud env if no service-account.json is present.

To use a service account for high QPS publishing, create a Compute Engine default service account in JSON format in Google Cloud Developer Console and save it to service-account.json.

First build the feeder binary with

go build feeder.go publisher.go scheduler.go taxirides.go debug.go storage.go pubsub.go

It's possible to use environment variables, a config -config properties.conf file or parameters. You can then use the following command to run the feeder (detailed help with ./feeder --help).

Replace <YOUR_PROJECT_ID> with your Google Cloud Platform Project ID.

GOOGLE_APPLICATION_CREDENTIALS=service-account.json
BUCKET=pubsub-public-billion-taxi-rides \
PROJECT=<YOUR_PROJECT_ID> \
FILEPREFIX=json/yellow_tripdata_2015-01-ext0000 \
PUBSUBTOPIC=realtime-feed \
DEBUG=true \
./feeder

Remove DEBUG=true if you don't want to get the debug output on stdout.

To run the binary with a config file do

./feeder -config properties.conf

The config file has the following format.

Replace <YOUR_PROJECT_ID> with your Google Cloud Platform Project ID.

bucket pubsub-public-billion-taxi-rides
project <YOUR_PROJECT_ID>
fileprefix json/yellow_tripdata_2015-01-ext0000
pubsubtopic realtime-feed
debug true

Alternatively you can build and run a Docker container. Build the container with make container. This uses multi-stage docker builds to build a small container based on alpine running the feeder.

To run the container locally create a properties.env as follows:

Replace <YOUR_PROJECT_ID> with your Google Cloud Platform Project ID.

echo -e "\
BUCKET=pubsub-public-billion-taxi-rides\n\
PROJECT=<YOUR_PROJECT_ID>\n\
FILEPREFIX=json/yellow_tripdata_2015-01-ext0000\n\
PUBSUBTOPIC=realtime-feed\n\
DEBUG=true" > properties.env

and then run the container. You'll need a service account and set the GOOGLE_APPLICATION_CREDENTIALS=service-account.json environment variable if you don't run it on GCE.

docker run --rm -v $PWD/service-account.json:/service-account.json --env-file=properties.env feeder:latest

Example config reading from JSON formatted input

Replace <YOUR_PROJECT_ID> with your Google Cloud Platform Project ID.

BUCKET=pubsub-public-billion-taxi-rides \
PROJECT=<REPLACER_YOUR_PROJECT_ID> \
FILEPREFIX=json/yellow_tripdata_2015-01-ext000 \
PUBSUBTOPIC=realtime-feed \
DEBUG=true

Running the program on GCE

Install gcloud auth helper for docker with gcloud auth configure-docker.
Build the container and push to Google Cloud Container Registry with make push-gcr.

Create a GCE instance selecting CoreOS stable. Make sure to give the instance API access to Google Cloud Pub/Sub and Google Cloud Storage.

Run the following commands under root (sudo su -) after you ssh into your new GCE instance. Replace <YOUR_PROJECT_ID> with your Google Cloud Platform Project ID.

echo -e "\
BUCKET=pubsub-public-billion-taxi-rides\n\
PROJECT=<YOUR_PROJECT_ID>\n\
FILEPREFIX=json/yellow_tripdata_2015-01-ext000\n\
PUBSUBTOPIC=realtime-feed\n\
DEBUG=true" > /root/properties.env

To fetch and run the container from the Container Registry run the following commands. Replace <YOUR_PROJECT_ID> with your Google Cloud Platform Project ID, <MEMORY_BOUNDARY> e.g. with '10g' within the limits of your GCE instance and <TOKEN_OUTPUT_OF_PREVIOUS_CMD> after you run gcloud auth before you run docker login.

sudo su -
gcloud auth application-default print-access-token
docker login -u oauth2accesstoken -p "<TOKEN_OUTPUT_OF_PREVIOUS_CMD>" https://gcr.io
docker -- pull gcr.io/<YOUR_PROJECT_ID>/feeder
docker run --restart unless-stopped -m=<MEMORY_BOUNDARY> -d \
--env-file=properties.env gcr.io/<PROJECT_NAME>/feeder:latest

To stop your container, find the running container id with docker ps and run docker stop <container-id>

Advanced Configuration, Speedup, Scale-Out, Memory Limit

There are a couple configuration parameters to simulate a high rate of Cloud PubSub message ingestion. The config parameters SPEEDUP, SKIPRIDES, SKIPOFFSET, STARTREFTIME and LOOP help you with running a long running high rate message stream.

SPEEDUP is used to speedup the realtime feed that is generated from the input dataset by a factor X.

SKIPRIDES is used to skip any n-th ride from the input dataset. This is used to lower the rate of messages for a single feeder instance and distribute the load to multiple feeder instances.

SKIPOFFSET is used to shift the modulo of SKIPRIDES by n. It should be between 0 and SKIPRIDES - 1.

STARTREFTIME is used to syncronize all feeder instances for a realistic scalable rides streaming. The format is 2015-01-04 20:00:00 based on the timezone of the dataset, by default America/New_York. The dataset timezone can be set with DATASETLOCATION.

LOOP is used for long running rides streaming. It resets the refTime by the same duration on all instances and re-reads the input dataset in a loop.

MAXSCHEDULERS restricts the maximum parallel ride schedulers and thus restricting how much memory is used for in-memory pending schedulers. It'll slow down file parsing.

MAXBUFFEREDMSGS sets the maximum outstanding messages to publish to Pub/Sub. Used to restrict memory usage. When buffer is full, messages get discarded.

PORT sets port for Prometheus metrics endpoint.

SPEEDUP=<SPEEDUP_FACTOR>\n\
SKIPRIDES=<MODULO_N_TO_SKIP_RIDES_AND_LOWER_QPS>\n\
SKIPOFFSET=<SKIPRIDES_MODULO_OFFSET_FOR_SCALEOUT>\n\
STARTREFTIME=<START_DATASET_REFTIME>\n\
LOOP=[true|false]\n\

There are more options available if you use your own dataset. Please refer to ./feeder --help for details.

Telemetry

Exposing Prometheus metrics endpoint. Metrics exposed are:

  • Rides loaded - ride_counter{type="loaded"}
  • Rides processed - ride_counter{type="processed"}
  • Rides invalid - ride_counter{type="invalid"}
  • Points loaded - point_counter{type="loaded"}
  • Points scheduled - point_counter{type="scheduled"}
  • Points failed - point_counter{type="failed"}
  • Messages sent - message_counter{type="sent"}
  • Messages failed - message_counter{type="failed"}
  • Pub/Sub backlog - pubsub_backlog

To calculate rates in Prometheus you can do e.g. rate(message_counter{type="sent"}[15s])

To scrape the telemetry data with Prometheus here is a minimal config prometheus.yml: You need to replace <feeder-ip> with the IP where the feeder is running.

global:
  scrape_interval:     15s
  evaluation_interval: 30s

scrape_configs:
- job_name: feeder_scrape
  scrape_interval: 5s
  scrape_timeout:  5s
  metrics_path: /metrics
  static_configs:
    - targets:
      - <feeder-ip>:8080

You can run prometheus in Docker:

docker run -p 9090:9090 -v `pwd`/prometheus.yml:/prometheus.yml \
prom/prometheus --config.file=/prometheus.yml

Filtering invalid data

The source dataset contains several rides with very long durations. To avoid running out of memory for continuous running feeders we filter those with an upper limit of 6 hours. You can find those rides by using BigQuery on the public available NYC Taxi Rides dataset.

#StandardSQL
SELECT TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) as trip_time, *
FROM `bigquery-public-data.new_york.tlc_yellow_trips_2015`
WHERE TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) > 360

Vendor Packaging

We use govendor (go get -u github.com/kardianos/govendor) as the vendor package manager.

Contributing

Contributions to this repository are always welcome and highly encouraged.

See CONTRIBUTING for more information on how to get started.

License

Apache 2.0 - See LICENSE for more information.

Use: The NYC Taxi & Limousine Commission’s dataset is publicly available for anyone to use under the following terms provided by the Dataset Source —https://data.cityofnewyork.us/— and is provided "AS IS" without any warranty, express or implied, from Google. Google disclaims all liability for any damages, direct or indirect, resulting from the use of the dataset.

This is not an official Google product

About

This tool generates emulated data stream based on the NYC Taxi & Limousine Commission’s open dataset expanded with additional routing information using the Google Maps Direction API and interpolated timestamps to simulate a real time scenario.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published