Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
d34edd4
initial
ansoniS1 Jan 4, 2024
15c0403
remove Oauth2 settings from print_useful_settings
ansoniS1 Jan 10, 2024
57d50cc
bump otel libs to latest
ansoniS1 Jan 10, 2024
e3aca57
get rid of opentelemetry library
ansoniS1 Jan 24, 2024
bc0b572
add requirement
ansoniS1 Jan 24, 2024
22f3bb9
update requirements
ansoniS1 Jan 24, 2024
5c3f188
change import
ansoniS1 Jan 24, 2024
00116f6
update generated protobuf OTEL files
ansoniS1 Jan 24, 2024
069ef52
update import
ansoniS1 Jan 24, 2024
19eca12
try alternative import style
ansoniS1 Jan 24, 2024
ae430bb
move import
ansoniS1 Jan 25, 2024
d9c9779
Integration tests and kuberentes_monitor / k8s implementation
alesnovak-s1 Jan 26, 2024
9462017
Prefix oauth configurations
ansoniS1 Jan 29, 2024
1ffd6a2
rename methods to use oauth_ prefix
ansoniS1 Jan 29, 2024
1884372
Re-factor client methods to util
ansoniS1 Jan 29, 2024
0217af5
first test
ansoniS1 Jan 29, 2024
8b163f0
update reference to ssl
ansoniS1 Jan 29, 2024
a4bf7e1
remove `__` from __receive_response_status
ansoniS1 Jan 29, 2024
8431a88
DTIN-3315 (#1223)
alesnovak-s1 Jan 8, 2024
5fe0df4
Removing unused queue import, which is not supported in python2. conc…
alesnovak-s1 Jan 10, 2024
1e2c4ce
Project import generated by Copybara. (#1232)
alesnovak-s1 Jan 11, 2024
e17c449
Fixing windows unit tests (#1234)
alesnovak-s1 Jan 17, 2024
447682d
EventID Parsing (#1235)
alesnovak-s1 Jan 18, 2024
c5246b5
DTIN-2346: Set monitor attribute when message_log_template is used (#…
jmakar-s1 Jan 22, 2024
b366b54
Integration tests and kuberentes_monitor / k8s implementation
alesnovak-s1 Jan 26, 2024
238c883
Update new LogWatcher `add_log_config` signature into unit-tests
ansoniS1 Jan 29, 2024
7c56dde
Merge branch 'master' into otlp_2
ansoniS1 Jan 29, 2024
393f6e2
Revert "EventID Parsing (#1235)"
ansoniS1 Jan 30, 2024
5e228ea
Revert "Fixing windows unit tests (#1234)"
ansoniS1 Jan 30, 2024
ef62d8f
Revert "Project import generated by Copybara. (#1232)"
ansoniS1 Jan 30, 2024
e586a03
Revert "Removing unused queue import, which is not supported in pytho…
ansoniS1 Jan 30, 2024
c39a1b4
revert all the changes (very messy)
ansoniS1 Jan 30, 2024
b709d12
revert all the changes (very messy)
ansoniS1 Jan 30, 2024
1036729
Enable OAuth/OTLP configs to be EnvAware
ansoniS1 Jan 30, 2024
adea362
add opentelemetry thirdparty to our docker container
ansoniS1 Jan 30, 2024
116086b
add reference worker_config server_url
ansoniS1 Jan 30, 2024
4c26218
fix worker config reference
ansoniS1 Jan 30, 2024
a9042ab
fix flow with selecting server_url
ansoniS1 Jan 31, 2024
42e35ce
remove requirement for scalyr_server to be set (server_url is adequate)
ansoniS1 Feb 13, 2024
08c913a
Handle implicit ports
ansoniS1 Feb 13, 2024
a4e986e
allow server_url or scalyr_server for setting otlp endpoint
ansoniS1 Feb 15, 2024
e20f0dc
add debug
ansoniS1 Feb 15, 2024
a615853
implement some statistics
ansoniS1 Feb 16, 2024
05dbfd0
add basic auth support
ansoniS1 Feb 27, 2024
6362d2b
fix header typo
ansoniS1 Feb 27, 2024
81278d9
add rename_logfile support
ansoniS1 Mar 11, 2024
b851c34
likely fix the rename_logfile support
ansoniS1 Mar 18, 2024
b7d7a30
Merge remote-tracking branch 'origin/master' into otlp_2
ansoniS1 Mar 18, 2024
e4a076e
Merge remote-tracking branch 'origin/master' into otlp_2
ansoniS1 Mar 21, 2024
af71d6c
add opentelemetry as thirdparty in aio
ansoniS1 Mar 26, 2024
af50ba2
fix handling of session_info and log/thread
ansoniS1 Mar 26, 2024
75334ef
Fix order of attribute encoding
ansoniS1 Mar 27, 2024
300584f
quick test
ansoniS1 Apr 2, 2024
62dc253
add log and thread last
ansoniS1 Apr 2, 2024
d627f14
avoid ambiguous resolution between event and session variables
ansoniS1 Apr 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agent_build_refactored/container_images/image_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ def create_agent_filesystem(
for third_party_lib in third_party_lis_root.iterdir():
if third_party_lib.name == "tcollector":
continue
if third_party_lib.name == "opentelemetry":
continue
if third_party_lib.is_dir():
shutil.rmtree(third_party_lib)
if third_party_lib.is_file():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,10 @@ def _build_package_root(self):
SOURCE_ROOT / "scalyr_agent/third_party/tcollector",
third_party_libs_dir / "tcollector",
)
shutil.copytree(
SOURCE_ROOT / "scalyr_agent/third_party/opentelemetry",
third_party_libs_dir / "opentelemetry",
)
shutil.copy2(
SOURCE_ROOT / "scalyr_agent/third_party/__init__.py",
third_party_libs_dir / "__init__.py",
Expand Down
3 changes: 3 additions & 0 deletions dev-requirements-new.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ python-dateutil==2.8.2
repoze.lru==0.7
six==1.14.0

# Required for opentelemetry support.
protobuf==3.17.3

# Required for redis monitor.
redis==2.10.5

Expand Down
2 changes: 1 addition & 1 deletion scalyr_agent/agent_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ def __perform_config_checks(self, no_check_remote):
)

# Send a test message to the server to make sure everything works. If not, print a decent error message.
if not no_check_remote or self.__config.use_new_ingestion:
if self.__config.transport == "scalyr" and (not no_check_remote or self.__config.use_new_ingestion):
client = create_client(self.__config, quiet=True)
try:
ping_result = client.ping()
Expand Down
97 changes: 97 additions & 0 deletions scalyr_agent/client_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import datetime
import json
import urllib
from base64 import b64encode

from urllib.parse import urlparse

import scalyr_agent.scalyr_logging as scalyr_logging

import requests

log = scalyr_logging.getLogger(__name__)


class ClientAuth(object):
def __init__(self, configuration, headers):
log.setLevel(configuration.debug_level)
self.configuration = configuration
self.headers = headers
if self.configuration.auth == "oauth2":
self.auth = OAuth2(self.configuration, self.headers)
elif self.configuration.auth == "bearer":
self.auth = BearerToken(self.configuration, self.headers)
elif self.configuration.auth == "basic":
self.auth = Basic(self.configuration, self.headers)
else:
self.auth = NoAuth(self.configuration, self.headers)

def authenticate(self):
return self.auth.authenticate()

class NoAuth(object):
def __init__(self, configuration, headers):
self.headers = headers

def authenticate(self):
# Add a header just so for traceability
self.headers["x-no-custom-auth"]="true"
return True

# Simple Authorization Header as Bearer Token using `api_key` configuration
class BearerToken(object):
def __init__(self, configuration, headers):
headers.set("Authorization", "Bearer " + configuration.api_key)

def authenticate(self):
return True

class Basic(object):
def __init__(self, configuration, headers):
authorization_header_value = b64encode(
bytes(('' + configuration.basic_username + ":" + configuration.basic_password).encode("utf-8"))).decode('utf-8')
headers["Authorization"]="Basic " + authorization_header_value

def authenticate(self):
return True

# Implement https://datatracker.ietf.org/doc/html/rfc6749#section-4.4 flow
class OAuth2(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a python library for OAuth2 to avoid having an in-house implementation?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python2 requirement paints us into some really weird corners like having to be on older versions of code that opens up security risks. This seems like the path of least resistance. I originally used the OpenTelemetry Python SDK, but had to back out of that because of no Python2 support.

def __init__(self, configuration, headers):
self.headers = headers # Headers modified for external requests
self.client_id = configuration.oauth_client_id
self.client_secret = configuration.oauth_client_secret
self.token_url = configuration.oauth_token_url
scopes = " ".join(configuration.oauth_scopes)
# Payload Body for the token exchange
self.auth_request = "grant_type=client_credentials&scope=" + urllib.parse.quote(scopes)
# Our token to use in requests
self.token = None
# When the token expires
self.expiry_time = datetime.datetime.now()
self.verify_ssl = configuration.verify_server_certificate
# Authentication Headers for the token exchange
authorization_header_value = b64encode( bytes(('' + self.client_id + ':' + self.client_secret).encode("utf-8"))).decode('utf-8')
self.auth_headers = { "Content-Type": "application/x-www-form-urlencoded", "Authorization" : "Basic " + authorization_header_value}

def authenticate(self):
if self.token == None or self.expiry_time < datetime.datetime.now():
log.info("Request/Refresh OAuth2 Token")
if not self.refresh_token():
raise Exception("OAuth2: Unable to refresh token")
self.headers["Authorization"]="Bearer " + self.token
return True

def refresh_token(self):
log.log(scalyr_logging.DEBUG_LEVEL_1, "OAuth2 Token Request to %s, body: %s, headers: %s" % (self.token_url, self.auth_request, self.headers))
resp = requests.post(self.token_url, data=self.auth_request, headers=self.auth_headers, verify=self.verify_ssl)
if resp.status_code == 200:
log.log(scalyr_logging.DEBUG_LEVEL_1, "OAUTH Response: %s" % (resp.content))
auth_response = json.loads(resp.content)
self.token = auth_response["access_token"]
self.expiry_time = datetime.datetime.now() + datetime.timedelta(seconds=auth_response["expires_in"])
log.log(scalyr_logging.DEBUG_LEVEL_1, "OAUTH Token: %s (expires: %s)" % (self.token, self.expiry_time))
return True
else:
raise Exception("Unable to obtain OAuth2 Token: " + str(resp.status_code) + "(" + str(resp.content) + ")")
return False
108 changes: 107 additions & 1 deletion scalyr_agent/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,8 @@ def print_useful_settings(self, other_config=None):
"default_sessions_per_worker",
"default_worker_session_status_message_interval",
"enable_worker_session_process_metrics_gather",
"server_url"
"transport",
# NOTE: It's important we use sanitzed_ version of this method which masks the API key
"sanitized_worker_configs",
]
Expand Down Expand Up @@ -1498,6 +1500,51 @@ def api_key(self):
"""Returns the configuration value for 'api_key'."""
return self.__get_config().get_string("api_key")

@property
def server_url(self):
"""Returns the configuration value for 'server_url'."""
return self.__get_config().get_string("server_url")

@property
def auth(self):
"""Returns the configuration value for 'auth'."""
return self.__get_config().get_string("auth")

@property
def oauth_client_id(self):
"""Returns the configuration value for 'oauth_client_id'."""
return self.__get_config().get_string("oauth_client_id")

@property
def oauth_client_secret(self):
"""Returns the configuration value for 'oauth_client_secret'."""
return self.__get_config().get_string("oauth_client_secret")

@property
def oauth_token_url(self):
"""Returns the configuration value for 'oauth_token_url'."""
return self.__get_config().get_string("oauth_token_url")

@property
def oauth_scopes(self):
"""Returns the configuration value for 'oauth_scopes'."""
return self.__get_config().get_json_array("oauth_scopes")

@property
def basic_username(self):
"""Returns the configuration value for 'basic_username'."""
return self.__get_config().get_string("basic_username")

@property
def basic_password(self):
"""Returns the configuration value for 'basic_password'."""
return self.__get_config().get_string("basic_password")

@property
def transport(self):
"""Returns the configuration value for 'transport'."""
return self.__get_config().get_string("transport")

@property
def scalyr_server(self):
"""Returns the configuration value for 'scalyr_server'."""
Expand Down Expand Up @@ -2179,6 +2226,10 @@ def __set_api_key(self, config, api_key):
if api_key:
config.put("api_key", api_key)

# Ingore api_key check if we are not writing to Scalyr/DataSet/SDL
if "transport" in config and config.get_string("transport") != "scalyr":
return

if "api_key" not in config:
raise BadConfiguration(
'The configuration file is missing the required field "api_key" that '
Expand Down Expand Up @@ -2264,6 +2315,36 @@ def __verify_main_config_and_apply_defaults(
self.__verify_or_set_optional_string(
config, "api_key", "", description, apply_defaults, env_aware=True
)
self.__verify_or_set_optional_string(
config, "transport", "scalyr", description, apply_defaults, env_aware=True
)
self.__verify_or_set_optional_string(
config, "auth", "", description, apply_defaults, env_aware=True
)
self.__verify_or_set_optional_string(
config, "oauth_client_id", "", description, apply_defaults, env_aware=True
)
self.__verify_or_set_optional_string(
config, "oauth_client_secret", "", description, apply_defaults, env_aware=True
)
self.__verify_or_set_optional_string(
config, "oauth_token_url", "", description, apply_defaults, env_aware=True
)
self.__verify_or_set_optional_array_of_strings(
config,
"oauth_scopes",
[],
description,
apply_defaults,
separators=[None, ","],
env_aware=True,
)
self.__verify_or_set_optional_string(
config, "basic_username", "", description, apply_defaults, env_aware=True
)
self.__verify_or_set_optional_string(
config, "basic_password", "", description, apply_defaults, env_aware=True
)
self.__verify_or_set_optional_bool(
config, "allow_http", False, description, apply_defaults, env_aware=True
)
Expand Down Expand Up @@ -2827,6 +2908,27 @@ def __verify_main_config_and_apply_defaults(
apply_defaults,
env_aware=True,
)
self.__verify_or_set_optional_string(
config,
"transport",
"scalyr",
description,
apply_defaults
)
self.__verify_or_set_optional_string(
config,
"server_url",
"",
description,
apply_defaults
)
self.__verify_or_set_optional_string(
config,
"auth",
"",
description,
apply_defaults
)
self.__verify_or_set_optional_bool(
config,
"use_new_ingestion",
Expand Down Expand Up @@ -4200,10 +4302,14 @@ def __verify_workers_entry_and_set_defaults(self, worker_entry, entry_index=None
worker_entry, "api_key", description % entry_index
)

# Only use scalyr_server if our transport is scalyr (but still use it if we don't get a server_url)
default_server_url = self.scalyr_server
if self.transport != "scalyr" and self.server_url is not None and self.server_url != "":
default_server_url = self.server_url
self.__verify_or_set_optional_string(
worker_entry,
"server_url",
default_value=self.scalyr_server,
default_value=default_server_url,
config_description=description % entry_index,
)

Expand Down
14 changes: 10 additions & 4 deletions scalyr_agent/copying_manager/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ def __init__(self, add_events_request, completion_callback):
# If there is a AddEventsTask object already created for the next request due to pipelining, this is set to it.
# This must be the next request if this request is successful, otherwise, we will lose bytes.
self.next_pipelined_task = None

# last status
self.receive_response_status = ()

class CopyingManagerWorkerSessionInterface(six.with_metaclass(ABCMeta)):
"""
Expand Down Expand Up @@ -489,9 +490,10 @@ def run(self):
# on the ground and advance.
if current_time - last_success > self.__config.max_retry_time:
if self.__pending_add_events_task is not None:

if (
"parseResponseFailed"
in self.__pending_add_events_task.__receive_response_status
in self.__pending_add_events_task.receive_response_status
):
log.error(
"Repeatedly failed to parse response due to exception. Dropping events",
Expand Down Expand Up @@ -586,7 +588,7 @@ def run(self):
full_response = ""
else:
(result, bytes_sent, full_response) = get_response()
self.__pending_add_events_task.__receive_response_status = (
self.__pending_add_events_task.receive_response_status = (
result
)
blocking_response_time_end = time.time()
Expand Down Expand Up @@ -1226,8 +1228,12 @@ def _init_scalyr_client(self, quiet=False):
"""
api_key = self.__worker_config_entry["api_key"]
if self.__config.use_new_ingestion:

self.__new_scalyr_client = create_new_client(self.__config, api_key=api_key)
elif self.__config.transport == "otlp":
from scalyr_agent.otlp_client import (
create_otlp_client,
)
self.__scalyr_client = create_otlp_client(self.__config, self.__worker_config_entry)
else:
self.__scalyr_client = create_client(
self.__config,
Expand Down
Loading