The emulated data stream generated by this tool is based on the NYC Taxi & Limousine Commission’s open dataset expanded with additional routing information using the Google Maps Direction API and interpolated timestamps to simulate a real time scenario.
You can find a Cloud Dataflow codelab using the same message format this feeder app is generating at https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon.
There is a public Cloud Pub/Sub topic driven by this app.
We generated one week worth of taxi rides data (from 2015) that you can find in the pubsub-public-billion-taxi-rides bucket with prefix 'json/yellow_tripdata_2015-01-ext...'
To see all available files and check the format you can do gsutil ls gs://pubsub-public-billion-taxi-rides/json
You need a Google Cloud Platform Project with Cloud Pub/Sub APIs enabled.
Create a Pub/Sub Topic
to stream the taxi rides to. In the docs we use realtime-feed
for the real
time streaming.
If you run the feeder
locally it uses your gcloud auth context. Make sure to
set the right user and project.
You might need to do gcloud auth login
after you switched to the user you want
to use.
When using the Google Cloud SDK gcloud tool with a normal user account, Pub/Sub operations are limited to a rate suitable for manual operations. You can activate a service account described in the GCloud Docs
The program uses default credentials from gcloud env if no service-account.json
is present.
To use a service account for high QPS publishing, create a Compute Engine
default service account in JSON format in Google Cloud Developer Console and
save it to service-account.json
.
First build the feeder
binary with
go build feeder.go publisher.go scheduler.go taxirides.go debug.go storage.go pubsub.go
It's possible to use environment variables, a config -config properties.conf
file or parameters.
You can then use the following command to run the feeder
(detailed help with ./feeder --help
).
Replace <YOUR_PROJECT_ID>
with your Google Cloud Platform Project ID.
GOOGLE_APPLICATION_CREDENTIALS=service-account.json
BUCKET=pubsub-public-billion-taxi-rides \
PROJECT=<YOUR_PROJECT_ID> \
FILEPREFIX=json/yellow_tripdata_2015-01-ext0000 \
PUBSUBTOPIC=realtime-feed \
DEBUG=true \
./feeder
Remove DEBUG=true
if you don't want to get the debug output on stdout.
To run the binary with a config file do
./feeder -config properties.conf
The config file has the following format.
Replace <YOUR_PROJECT_ID>
with your Google Cloud Platform Project ID.
bucket pubsub-public-billion-taxi-rides
project <YOUR_PROJECT_ID>
fileprefix json/yellow_tripdata_2015-01-ext0000
pubsubtopic realtime-feed
debug true
Alternatively you can build and run a Docker container.
Build the container with make container
. This uses multi-stage docker builds
to build a small container based on alpine running the feeder.
To run the container locally create a properties.env
as follows:
Replace <YOUR_PROJECT_ID>
with your Google Cloud Platform Project ID.
echo -e "\
BUCKET=pubsub-public-billion-taxi-rides\n\
PROJECT=<YOUR_PROJECT_ID>\n\
FILEPREFIX=json/yellow_tripdata_2015-01-ext0000\n\
PUBSUBTOPIC=realtime-feed\n\
DEBUG=true" > properties.env
and then run the container. You'll need a service account and set the
GOOGLE_APPLICATION_CREDENTIALS=service-account.json
environment variable
if you don't run it on GCE.
docker run --rm -v $PWD/service-account.json:/service-account.json --env-file=properties.env feeder:latest
Replace <YOUR_PROJECT_ID>
with your Google Cloud Platform Project ID.
BUCKET=pubsub-public-billion-taxi-rides \
PROJECT=<REPLACER_YOUR_PROJECT_ID> \
FILEPREFIX=json/yellow_tripdata_2015-01-ext000 \
PUBSUBTOPIC=realtime-feed \
DEBUG=true
Install gcloud auth helper for docker with gcloud auth configure-docker
.
Build the container and push to Google Cloud Container Registry
with make push-gcr
.
Create a GCE instance selecting CoreOS stable. Make sure to give the instance API access to Google Cloud Pub/Sub and Google Cloud Storage.
Run the following commands under root (sudo su -
) after you ssh into your
new GCE instance.
Replace <YOUR_PROJECT_ID>
with your Google Cloud Platform
Project ID.
echo -e "\
BUCKET=pubsub-public-billion-taxi-rides\n\
PROJECT=<YOUR_PROJECT_ID>\n\
FILEPREFIX=json/yellow_tripdata_2015-01-ext000\n\
PUBSUBTOPIC=realtime-feed\n\
DEBUG=true" > /root/properties.env
To fetch and run the container from the Container Registry
run the following commands.
Replace <YOUR_PROJECT_ID>
with your Google Cloud Platform Project ID, <MEMORY_BOUNDARY>
e.g.
with '10g' within the limits of your GCE instance and
<TOKEN_OUTPUT_OF_PREVIOUS_CMD>
after you run gcloud auth
before you run
docker login
.
sudo su -
gcloud auth application-default print-access-token
docker login -u oauth2accesstoken -p "<TOKEN_OUTPUT_OF_PREVIOUS_CMD>" https://gcr.io
docker -- pull gcr.io/<YOUR_PROJECT_ID>/feeder
docker run --restart unless-stopped -m=<MEMORY_BOUNDARY> -d \
--env-file=properties.env gcr.io/<PROJECT_NAME>/feeder:latest
To stop your container, find the running container id with docker ps
and run
docker stop <container-id>
There are a couple configuration parameters to simulate a high rate of Cloud
PubSub message ingestion.
The config parameters SPEEDUP
, SKIPRIDES
, SKIPOFFSET
, STARTREFTIME
and
LOOP
help you with running a long running high rate message stream.
SPEEDUP
is used to speedup the realtime feed that is generated from the input
dataset by a factor X.
SKIPRIDES
is used to skip any n-th ride from the input dataset. This is used
to lower the rate of messages for a single feeder
instance and distribute the
load to multiple feeder instances.
SKIPOFFSET
is used to shift the modulo of SKIPRIDES by n. It should be between
0 and SKIPRIDES - 1.
STARTREFTIME
is used to syncronize all feeder
instances for a realistic
scalable rides streaming. The format is 2015-01-04 20:00:00
based on the
timezone of the dataset, by default America/New_York
. The dataset timezone
can be set with DATASETLOCATION
.
LOOP
is used for long running rides streaming. It resets the refTime by the
same duration on all instances and re-reads the input dataset in a loop.
MAXSCHEDULERS
restricts the maximum parallel ride schedulers and thus
restricting how much memory is used for in-memory pending schedulers. It'll
slow down file parsing.
MAXBUFFEREDMSGS
sets the maximum outstanding messages to publish to Pub/Sub.
Used to restrict memory usage. When buffer is full, messages get discarded.
PORT
sets port for Prometheus metrics endpoint.
SPEEDUP=<SPEEDUP_FACTOR>\n\
SKIPRIDES=<MODULO_N_TO_SKIP_RIDES_AND_LOWER_QPS>\n\
SKIPOFFSET=<SKIPRIDES_MODULO_OFFSET_FOR_SCALEOUT>\n\
STARTREFTIME=<START_DATASET_REFTIME>\n\
LOOP=[true|false]\n\
There are more options available if you use your own dataset.
Please refer to ./feeder --help
for details.
Exposing Prometheus metrics endpoint. Metrics exposed are:
- Rides loaded - ride_counter{type="loaded"}
- Rides processed - ride_counter{type="processed"}
- Rides invalid - ride_counter{type="invalid"}
- Points loaded - point_counter{type="loaded"}
- Points scheduled - point_counter{type="scheduled"}
- Points failed - point_counter{type="failed"}
- Messages sent - message_counter{type="sent"}
- Messages failed - message_counter{type="failed"}
- Pub/Sub backlog - pubsub_backlog
To calculate rates in Prometheus you can do e.g.
rate(message_counter{type="sent"}[15s])
To scrape the telemetry data with Prometheus here is a minimal config
prometheus.yml
:
You need to replace <feeder-ip>
with the IP where the feeder is running.
global:
scrape_interval: 15s
evaluation_interval: 30s
scrape_configs:
- job_name: feeder_scrape
scrape_interval: 5s
scrape_timeout: 5s
metrics_path: /metrics
static_configs:
- targets:
- <feeder-ip>:8080
You can run prometheus in Docker:
docker run -p 9090:9090 -v `pwd`/prometheus.yml:/prometheus.yml \
prom/prometheus --config.file=/prometheus.yml
The source dataset contains several rides with very long durations. To avoid running out of memory for continuous running feeders we filter those with an upper limit of 6 hours. You can find those rides by using BigQuery on the public available NYC Taxi Rides dataset.
#StandardSQL
SELECT TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) as trip_time, *
FROM `bigquery-public-data.new_york.tlc_yellow_trips_2015`
WHERE TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) > 360
We use govendor
(go get -u github.com/kardianos/govendor
) as the vendor package manager.
Contributions to this repository are always welcome and highly encouraged.
See CONTRIBUTING for more information on how to get started.
Apache 2.0 - See LICENSE for more information.
Use: The NYC Taxi & Limousine Commission’s dataset is publicly available for anyone to use under the following terms provided by the Dataset Source —https://data.cityofnewyork.us/— and is provided "AS IS" without any warranty, express or implied, from Google. Google disclaims all liability for any damages, direct or indirect, resulting from the use of the dataset.
This is not an official Google product