Skip to content

Commit

Permalink
Allow custom headers in Python HTTP transport.
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dardzinski <[email protected]>
  • Loading branch information
JDarDagran committed Sep 24, 2024
1 parent 9e94fea commit d4db05d
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 1 deletion.
7 changes: 7 additions & 0 deletions client/python/openlineage/client/transport/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 87,8 @@ class HttpConfig(Config):
session: Session | None = attr.ib(default=None)
# not set by TransportFactory
adapter: HTTPAdapter | None = attr.ib(default=None)
# custom headers support
custom_headers: dict[str, str] = attr.ib(factory=dict)

@classmethod
def from_dict(cls, params: dict[str, Any]) -> HttpConfig:
Expand Down Expand Up @@ -150,6 152,7 @@ def __init__(self, config: HttpConfig) -> None:
self.session.headers["Content-Type"] = "application/json"
auth_headers = self._auth_headers(config.auth)
self.session.headers.update(auth_headers)
self.session.headers.update(config.custom_headers)
self.timeout = config.timeout
self.verify = config.verify
self.compression = config.compression
Expand All @@ -168,6 171,9 @@ def emit(self, event: Event) -> Response:
http_client.HTTPConnection.debuglevel = 0
body, headers = self._prepare_request(Serde.to_json(event))

# Update headers with custom headers from the config
headers.update(self.config.custom_headers)

if self.session:
resp = self.session.post(
url=urljoin(self.url, self.endpoint),
Expand All @@ -179,6 185,7 @@ def emit(self, event: Event) -> Response:
else:
headers["Content-Type"] = "application/json"
headers.update(self._auth_headers(self.config.auth))
headers.update(self.config.custom_headers)
with Session() as session:
resp = session.post(
url=urljoin(self.url, self.endpoint),
Expand Down
120 changes: 119 additions & 1 deletion client/python/tests/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 4,14 @@

import datetime
import gzip
import os
from typing import TYPE_CHECKING
from unittest.mock import MagicMock, patch

from openlineage.client import OpenLineageClient
from openlineage.client.run import Job, Run, RunEvent, RunState
from openlineage.client.serde import Serde
from openlineage.client.transport.http import HttpCompression, HttpConfig, HttpTransport
from openlineage.client.transport.http import ApiKeyTokenProvider, HttpCompression, HttpConfig, HttpTransport
from openlineage.client.uuid import generate_new_uuid
from requests import Session

Expand Down Expand Up @@ -163,3 165,119 @@ def test_http_config_configs_session() -> None:
assert not hasattr(config.auth, "api_key")
assert config.session is s
assert config.adapter is None


@patch("requests.Session.post")
def test_http_transport_custom_headers_applied(mock_post):
custom_headers = {"X-Custom-Header": "CustomValue", "X-Another-Header": "AnotherValue"}

config = HttpConfig(url="http://example.com", custom_headers=custom_headers)

transport = HttpTransport(config)
mock_event = MagicMock()

with patch("openlineage.client.serde.Serde.to_json", return_value='{"mock": "event"}'):
transport.emit(mock_event)

mock_post.assert_called_once()
_, kwargs = mock_post.call_args
headers = kwargs["headers"]

assert headers["X-Custom-Header"] == "CustomValue"
assert headers["X-Another-Header"] == "AnotherValue"


@patch("requests.Session.post")
def test_http_transport_auth_and_custom_headers_applied(mock_post):
custom_headers = {"X-Custom-Header": "CustomValue"}
auth_token = "Bearer test_token" # noqa: S105

# Set up config with an ApiKeyTokenProvider
config = HttpConfig(
url="http://example.com",
auth=ApiKeyTokenProvider({"api_key": "test_token"}),
custom_headers=custom_headers,
)

transport = HttpTransport(config)
mock_event = MagicMock()

with patch("openlineage.client.serde.Serde.to_json", return_value='{"mock": "event"}'):
transport.emit(mock_event)

mock_post.assert_called_once()
_, kwargs = mock_post.call_args
headers = kwargs["headers"]

assert headers["Authorization"] == auth_token
assert headers["X-Custom-Header"] == "CustomValue"


@patch("requests.Session.post")
def test_http_transport_no_custom_headers(mock_post):
config = HttpConfig(url="http://example.com")
transport = HttpTransport(config)
mock_event = MagicMock()

with patch("openlineage.client.serde.Serde.to_json", return_value='{"mock": "event"}'):
transport.emit(mock_event)

mock_post.assert_called_once()
_, kwargs = mock_post.call_args
headers = kwargs["headers"]

# Only the Content-Type header should be set if no custom headers
assert headers["Content-Type"] == "application/json"
assert "X-Custom-Header" not in headers


@patch("requests.Session.post")
def test_http_transport_compression_with_custom_headers(mock_post):
custom_headers = {"X-Custom-Header": "CustomValue"}

config = HttpConfig(
url="http://example.com", compression=HttpCompression.GZIP, custom_headers=custom_headers
)
transport = HttpTransport(config)
mock_event = MagicMock()

with patch("openlineage.client.serde.Serde.to_json", return_value='{"mock": "event"}'), patch(
"gzip.compress", return_value=b"compressed_data"
):
transport.emit(mock_event)

mock_post.assert_called_once()
_, kwargs = mock_post.call_args
headers = kwargs["headers"]
data = kwargs["data"]

assert headers["X-Custom-Header"] == "CustomValue"
assert headers["Content-Encoding"] == "gzip"
assert data == b"compressed_data"


@patch("requests.Session.post")
@patch.dict(
os.environ,
{
"OPENLINEAGE__TRANSPORT__TYPE": "http",
"OPENLINEAGE__TRANSPORT__URL": "http://example.com",
"OPENLINEAGE__TRANSPORT__CUSTOM_HEADERS__CUSTOM_HEADER": "FIRST",
"OPENLINEAGE__TRANSPORT__CUSTOM_HEADERS__ANOTHER_HEADER": "second",
},
)
def test_http_transport_with_custom_headers_from_env_vars(mock_post):
transport = OpenLineageClient().transport
mock_event = MagicMock()

with patch("openlineage.client.serde.Serde.to_json", return_value='{"mock": "event"}'), patch(
"gzip.compress", return_value=b"compressed_data"
):
transport.emit(mock_event)

mock_post.assert_called_once()
_, kwargs = mock_post.call_args
headers = kwargs["headers"]

assert headers["custom_header"] == "FIRST"
assert headers["another_header"] == "second"
1 change: 1 addition & 0 deletions website/docs/client/python.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 206,7 @@ Allows sending events to HTTP endpoint, using [requests](https://requests.readth
- `type` - string specifying the "api_key" or the fully qualified class name of your TokenProvider. Required if `auth` is provided.
- `apiKey` - string setting the Authentication HTTP header as the Bearer. Required if `type` is `api_key`.
- `compression` - string, name of algorithm used by HTTP client to compress request body. Optional, default value `null`, allowed values: `gzip`. Added in v1.13.0.
- `custom_headers` - dictionary of additional headers to be sent with each request. Optional, default: `{}`.

#### Behavior

Expand Down

0 comments on commit d4db05d

Please sign in to comment.