- Python 3
- Java 8+ needs to be in place (and in PATH) for Spark (Pyspark version 2.2+) to work src
- Create a virtual env and activate it with
source /path/to/venv/bin/activate
cd /path/to/setup.py
pip install -e .
to installes-retriever
(note:-e
flag is important to be able to change the configuration. This will be changed in the future)
python -m pytest
NOTE: While the script is running, localhost:4040
displays information about what
spark does.
Under es-pyspark-retriever/src/es_retriever/examples
there is a simple example retrieving data for one day for a single hour and an example that retrieves data for one week.
Configuration information should be given as parameters when initializing the Config
object, for instance Config("localhost", "user", "password", "deflect.log", "deflect_access")
from datetime import datetime
from es_retriever.config import Config
from es_retriever.es.storage import EsStorage
# create a configuration instance
config = Config(SERVER, "user", "password", "deflect.log", "deflect_access")
# create an EsStorage instance with the given configuration
storage = EsStorage(
config
)
# note: the following will create multiple files - fileparts
# write to a file as json
df.write.mode("overwrite").json("somefilename")
# or as csv
df.write.mode("overwrite").csv("somefilename")
To save to a single file instead of multiparts:
df.coalesce(1).write.mode("overwrite").json("full/path/to/file")
Mode can be:
* append
: Append contents to existing data.
* overwrite
: Overwrite existing data.
* error
or errorifexists
: Throw an exception if data already exists.
* ignore
: Silently ignore this operation if data already exists.
Get data for 1 day from 11-04-18
to 12-4-18
:
# the dates we need to look for
since = datetime(2018, 4, 11)
until = datetime(2018, 4, 12)
# get the data for the period of time
df = storage.get(since, until)
If hours / minutes / seconds are specified in since / until, the filtering of the data will take that into account.
# the dates we need to look for
since = datetime(2018, 4, 11, 10, 00, 00)
until = datetime(2018, 4, 11, 11, 00, 00)
# get the data for the period of time
df = storage.get(since, until)
# the dates we need to look for
since = datetime(2018, 4, 11, 10, 00, 00)
until = datetime(2018, 4, 11, 11, 00, 00)
# hour filter:
hour_filter = (
(F.col("@timestamp") >= since) & (F.col("@timestamp") <= until)
)
# get the data for the period of time
df = storage.get(since, until, hour_filter)
# or use the hour-of-day field in the logs:
hour_filter = (
(F.col("hour-of-day") >= 10) & (F.col("hour-of-day") <= 11)
)
# get the data for the period of time
df = storage.get(since, until, hour_filter)
# the dates we need to look for
since = datetime(2018, 4, 11)
until = datetime(2018, 4, 18)
# get the data for the period of time
df = storage.get(since, until)
# define the start day: 01-04-2018
since = datetime(2018, 4, 1, 10, 00, 00)
until = datetime(2018, 4, 1, 11, 00, 00)
# iterate for a week
for day_no in range(7):
# keep the name for convenience
file_name = "{}-{year}.{month}.{day}.{hour}.{minute}.{second}".format(
config.es_base_index,
year=since.year,
month=str(since.month).zfill(2),
day=str(since.day).zfill(2),
hour=str(since.hour).zfill(2),
minute=str(since.minute).zfill(2),
second=str(since.second).zfill(2)
)
# get the data for the period of time
df = storage.get(
since,
until
)
print "Retrieving data for {}".format(file_name)
# save
df.write.mode("overwrite").json(file_name)
# increment since and until by one day
since = since + timedelta(days=1)
until = until + timedelta(days=1)
# define the start day:
since = datetime(2018, 4, 1)
until = datetime(2018, 4, 7)
# get only 10 to 11 am
hour_filter = (
(F.col("hour-of-day") >= 10) & (F.col("hour-of-day") <= 11)
)
# get the data for the period of time
df = storage.get(
since,
until,
hour_filter
)
# save
df.write.mode("overwrite").json("data_for_week_somenumber")
A bit more complex hour filter:
# define the start day:
since = datetime(2018, 4, 1)
until = datetime(2018, 4, 7)
# get only 10 to 11 am and 15 to 16 pm
hours_filter = (
(F.col("hour-of-day") >= 10) & (F.col("hour-of-day") <= 11) &
(F.col("hour-of-day") >= 15) & (F.col("hour-of-day") <= 16)
)
# get the data for the period of time
df = storage.get(
since,
until,
hours_filter
)
To filter the rows where some field matches a regex expression:
df = df.filter(df["field / column name"].rlike("regex expression"))
When the es-pyspark-retriever package is installed, it also installs an esretrieve
command. To use this tool, you first need to create a configuration file in ~/opsdash
with the following format :
[OpsDash]
user: USERNAME HERE
password: PASSWORD HERE
The options so far:
Script to retrieve data fromElasticSearch using Pyspark
positional arguments:
since The start date in the format YYYY-MM-DD HH:MM:SS, e.g.
2018-01-01 00:00:00
until The end date in the format YYYY-MM-DD HH:MM:SS, e.g.
2018-01-02 00:00:00
optional arguments:
-h, --help show this help message and exit
-f [FILTER [FILTER ...]], --filter [FILTER [FILTER ...]]
extra filters separated by space, e.g. to get data for
a specific dnet and a specific ip, use dnet=somednet
client_ip=95.11.1.111
-rf [REGEX_FILTER [REGEX_FILTER ...]], --regex_filter [REGEX_FILTER [REGEX_FILTER ...]]
extra regex filters separated by space, e.g.
dnet="some_valid_regex"
client_ip="95\.11\.\d{1,3}\.111"
-sf SQL_FILTER, --sql_filter SQL_FILTER
SQL like filter for more complex quering, e.g.
dnet="somednet" OR day-of-week>=2
-o OUTPUT, --output OUTPUT
The full path and name of the file to save the result
to, defaults to current working dir/base_index
-b, --banjax Query Banjax logs instead of Deflect logs
To get data from deflect.log from "2018-04-23 10:00:00" to "2018-04-23 11:00:00" where dnet equals somednet and client_ip equals 95.111.111.111
python simple_retrieve_cli.py "2018-04-23 10:00:00" "2018-04-23 11:00:00" --f dnet=somednet client_ip=95.111.111.111 -o /some/path/to/file
Sample output of the above:
18/04/24 15:47:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/04/24 15:47:38 WARN Utils: Your hostname, SOMEHOSTNAME resolves to a loopback address: 127.0.1.1; using SOMEOTHERIP instead (on interface SOMEINTERFACE)
18/04/24 15:47:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Getting data from 2018-04-23 10:00:00 to 2018-04-23 11:00:00, filter(s) Column<((dnet = somednet) AND (client_ip = 95.111.111.111))>
root
|-- @timestamp: timestamp (nullable = true)
|-- field1: string (nullable = true)
|-- field2: string (nullable = true)
...
...
...
Starting data retrieval and saving to /some/path/to/file
The common steps are:
- Import necessary
- Create a configuration instance
- Create a spark instance
- Use
spark.read.json(path to folder)
orspark.read.csv(path to folder)
to load the data
from es_retriever.config import Config
from es_retriever.spark import get_or_create_spark_session
config = Config(SERVER, "user", "password", "deflect.log", "deflect_access")
# create a spark session using the configuration
spark = get_or_create_spark_session(config)
# read all files that start with some-folder-name-previously-stored-with-spark
# - e.g. when you have different files for different days, this will load them
# all into one dataframe
# note: lazy loading and will spill to disc if ram not enough
df = spark.read.json("/path/to/some-folder-name-previously-stored-with-spark*")
Since spark operates lazily, we need to cache the dataframe once we get the data if we need to perform stuff on it. E.g.
df = storage.get(
since,
until,
hours_filter
).filter(...)
.select(...)
.cache() # after filters
# then:
print df.count()
# if we do not cache the data will be fetched twice, once for load and filtering and once on count
In general, it is a good practice to save the data first and then, on a separate session, load from file and perform actions on the data.
df.printSchema()
>>>
root
|-- @timestamp: string (nullable = true)
|-- field1: string (nullable = true)
|-- field2: string (nullable = true)
...
df.count()
>>>1276263
df.select("@timestamp", "field1").show()
>>>
+--------------------+--------------------+
| @timestamp| field1|
+--------------------+--------------------+
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:34:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:34:...| data...|
|2018-04-01T10:36:...| data...|
|2018-04-01T10:36:...| data...|
+--------------------+--------------------+
only showing top 20 rows
specific_df = df.select("*").where(df.field1 == "hello")
specific_df.show() # will output the results
A more specific example:
specific_df = df.select("*").where(
(df.content_type == "image/jpeg") & (df.dnet == "somednet")
)
specific_df.show() # will output the results
To run the simple_retrieve_cli.py
:
- Create a
.env
file according to thedot_env_example
- Modify
docker-compose.yaml
according to the options and filters explained in the CLI example section
command: python simple_retrieve_cli.py ${ELK_HOST} es-index es-index-type "start datetime" "end datetime" -u ${ELK_USER} -p -o /data/nameofthefolder
# for example:
command: python simple_retrieve_cli.py ${ELK_HOST} test.log web_access "2019-09-01 00:00:00" "2019-09-01 01:00:00" -u ${ELK_USER} -p -o /data/testdata
The example above will get all data between "2019-09-01 00:00:00" and "2019-09-01 01:00:00" and it will store them under /data/testdata_YYYY_MM_dd
folder.
If the data range spans over more than a single day, then the data will be stored in separate folders /data/testdata_YYYY_MM_day1
, /data/testdata_YYYY_MM_day2
etc.
- Run:
docker-compose run --rm --service-ports es_retriever
- you will be prompted for your elastic search password. This process will probably take a lot of time to complete, depending on the date range and the criteria given, so it is better if this is run using e.g.screen
. You can also check the spark console underlocalhost:4040
- Better docstrings
- Unittests to be added
- EsStorage assumes time based indices
- Config assumes ssl for es
- Currently,
geoip
andtags
need to be filtered out otherwise save fails. - Getting one years" data with 1h sampling (different hour each time) needs some groupping for the requests to be efficient. note
This work is copyright (c) 2020, eQualit.ie inc., and is licensed under a Creative Commons Attribution 4.0 International License.