_
_,ノ;:。-:ヽ
 ̄`ヽ .l|
ノ_ノl _,,.,.,..,..,.,..,.,.,,_
/;;;:ノ:´;'::;'::;'::;'::;'::;'::;'`>:"7
_l "ー''""´´""""""´゙ヾ:/
、 `゙''''ーー~ー--―ー―''" ̄ ̄
 ̄ ― _ 二 ー
Grebe is forwarder Data-like string message from RabbitMQ to Clickhouse.
- Receive JSON, JSON Lines or CSV like string as message from RabbitMQ specified queue on RabbitMQ.
- Parse, convert, and create table if need it by Lake Weed. Each attributes are mapped to columns on Clickhouse table.
$ pip install -r requirements.txt
$ chmod x grebe.py
$ ./grebe.py nayco -mh 192.168.11.200 -dh 192.168.11.200
$ pytest
$ ./grebe.py -h
usage: grebe.py [-h] [-mh MH] [-mp MP] [-dh DH] [-dp DP] [-dn DN]
[--schema-store {local,rdb}]
[--local-schema-dir LOCAL_SCHEMA_DIR]
[--local-source-settings-file LOCAL_SOURCE_SETTINGS_FILE]
[--type-file TYPE_FILE]
[--tz TZ]
[--api-port API_PORT]
[--log-level {DEBUG,INFO,WARN,ERROR}]
[--log-format LOG_FORMAT] [--log-file LOG_FILE]
[--log-file-count LOG_FILE_COUNT]
[--log-file-size LOG_FILE_SIZE]
[--retry-max-count RETRY_MAX_COUNT]
queue_name
Forward Data-like string message from RabbitMQ to Clickhouse
positional arguments:
queue_name Queue name to subscribe on RabbitMQ
optional arguments:
-h, --help show this help message and exit
-mh MH RabbitMQ host
-mp MP RabbitMQ port
-dh DH Clickhouse host
-dp DP Clickhouse port by native connection
-dn DN Clickhouse DB name to store data
--schema-store {local,rdb}
Schema store location
--local-schema-dir LOCAL_SCHEMA_DIR
Schema DB directory path when schema-sotre is local
-local-source-settings-file LOCAL_SOURCE_SETTINGS_FILE
Path to source settings as local file. If this parameter skipped, source setting will be create on DB
--tz TZ Timezone string will be used as default offset in parsing source string if it has no offset
--api-port API_PORT Port number of grebe Web API. It is disabled if this is not provided.
--log-level {DEBUG,INFO,WARN,ERROR}
Log level
--log-format LOG_FORMAT
Log format by 'logging' package
--log-file LOG_FILE Log file path
--log-file-count LOG_FILE_COUNT
Log file keep count
--log-file-size LOG_FILE_SIZE
Size of each log file
--retry-max-count RETRY_MAX_COUNT
Max count of retry to processing. Message is discard when exceeded max count.
This feature is provided by Lake Weed.
General
<source_id>:
types:
<field_name> : <type>
<field_name> : <type>
<source_id>:
types:
<field_name> : <type>
<field_name> : <type>
...
weather:
types:
city: string
city_code: int
temperature : double
location__longitude : double
location__latitude : double
...
This feature is provided by Lake Weed.
Gerebe provide web api when --api-port
argument is specified as api port number.
When you launch with --api-port 8888
, you can access http://localhost:8888/
and will be shown Grebe is running.
message.
Grebe just show Grebe is running.
message.
Example: (200)
Grebe is running.
Command line argument as json format will be shown.
Example: (200)
{"api_port":8888,"dh":"localhost","dp":9500,"local_schema_dir":"schemas","local_source_settings_file":"","log_file":null,"log_file_count":1000,"log_file_size":1000000,"log_format":"[%(levelname)s] %(asctime)s | %(pathname)s(L%(lineno)s) | %(message)s","log_level":"INFO","mh":"localhost","mp":5672,"queue_name":"nayco","retry_max_count":3,"schema_store":"rdb","tz":"Asia/Tokyo"}
Current schema cache on Grebe. Schema chash is correspondance Source AMQP topic name and schema of message to destination table name.
Example: There are 2 schema_caches. (200)
[
{
"schema": {
"status": "String"
},
"source": "rabbitmq_stat_aliveness-test",
"table": "rabbitmq_stat_aliveness-test_001"
},
{
"schema": {
"arguments": "String",
"auto_delete": "UInt8",
"durable": "UInt8",
"internal": "UInt8",
"name": "String",
"type": "String",
"user_who_performed_action": "String",
"vhost": "String"
},
"source": "rabbitmq_stat_exchanges",
"table": "rabbitmq_stat_exchanges_001"
}
]
Reload schema cache from schema source.
Example: Success to reload all schemas. (200)
{"result":"Success","schema_count":66,"store":"<class 'grebe.schema_store_clickhouse.SchemaStoreClickhouse'>"}
Example: Failed reload schemas. (500)
{"result":"Failed","stack_trace":"...traceback.format_exc()..."}
Current source_settings cache on Grebe.
source_settings
chash is settings of source(AMQP topic name).
Example: There are 2 schema_caches. (200)
[
{
"source_id": "weather",
"source_settings": {
"types": {
"city": "string",
"city_code": "int",
"location__latitude": "double",
"location__longitude": "double",
"temperature": "double"
}
}
},
{
"source_id": "rabbitmq_stat_aliveness-test",
"source_settings": {
"types": {
"now": "datetime",
"status": "string"
}
}
}
]
Reload source_settings.
Example: Success to reload all source settings. (200)
{"result":"Success","store":"<class 'grebe.source_setting_store_clickhouse.SourceSettingStoreClickhouse'>"}
Example: Failed to reload. (500)
{"result":"Failed","stack_trace":"...traceback.format_exc()..."}
git tag vX.Y.Z
git push origin master --tags
- Fork it ( https://github.com/tac0x2a/grebe/fork )
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create a new Pull Request