-
Notifications
You must be signed in to change notification settings - Fork 11
add support for eainfo streams #46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Hi Derek @djw8605 Also, @kazeborja lets sit together with Derek to see how we can benefit from common goals of both osg and wlcg. Specifically, regarding deployment architectures and new developments weather we re-write a go collector or continue improving the existing python implemenations and if its really necessary for our needs to fork the project. |
|
This looks good, I presume it is tested? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for decoding and parsing eainfo streams (scitags) containing activity and experiment codes from XRootD monitoring data. The implementation uses a JSON mapping file from the scitags API to translate numeric codes to human-readable experiment and activity names.
- Added
eaInfoparsing function in the decoding module to extract and decode scitags information - Integrated eainfo processing into the collector to capture experiment and activity data for user sessions
- Updated WLCG converter to prioritize experiment field from scitags over the traditional vo field and include activity information
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| Collectors/decoding.py | Adds new eainfo namedtuple and eaInfo() function to parse scitags messages containing experiment and activity codes |
| Collectors/DetailedCollector.py | Loads scitags mapping configuration, processes 'U' header code for eainfo messages, and stores eainfo data with user sessions |
| Collectors/wlcg_converter.py | Prioritizes 'experiment' field over 'vo' field and adds 'activity' field with default value 'unknown' |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| eainfo = decoding.eaInfo(rest, self._scitags_mapping) | ||
|
|
||
| try: | ||
| self.logger.debug("Adding new eainfo: %s.", eainfo) | ||
| self._users[sid][userInfo]['eainfo'] = eainfo | ||
| except KeyError: | ||
| self.logger.warning("Received eainfo for a server or user not seen yet") | ||
|
|
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing error handling for decoding.eaInfo(). If the function raises exceptions due to malformed input (IndexError, ValueError, KeyError from the issues in eaInfo implementation), the collector will crash. Consider wrapping the call in a try-except block to handle and log these errors gracefully.
| eainfo = decoding.eaInfo(rest, self._scitags_mapping) | |
| try: | |
| self.logger.debug("Adding new eainfo: %s.", eainfo) | |
| self._users[sid][userInfo]['eainfo'] = eainfo | |
| except KeyError: | |
| self.logger.warning("Received eainfo for a server or user not seen yet") | |
| try: | |
| eainfo = decoding.eaInfo(rest, self._scitags_mapping) | |
| except (IndexError, ValueError, KeyError) as exc: | |
| self.logger.error("Failed to decode eainfo: %s. Input: %r", exc, rest) | |
| eainfo = None | |
| if eainfo is not None: | |
| try: | |
| self.logger.debug("Adding new eainfo: %s.", eainfo) | |
| self._users[sid][userInfo]['eainfo'] = eainfo | |
| except KeyError: | |
| self.logger.warning("Received eainfo for a server or user not seen yet") |
| return xfrinfo([lfn, tod, sz, tm, op, rc, pd]) | ||
|
|
||
|
|
||
| def eaInfo(message, scitags_mapping): |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing documentation for the eaInfo function. Consider adding a docstring that explains the expected message format (e.g., the structure of the '&'-delimited parameters) and the expected structure of the scitags_mapping dictionary, similar to how other parsing functions like revServerInfo are documented.
| experiment = scitags_maping['experiments'][expc]['expName'] | ||
| activity = scitags_maping['experiments'][expc]['activities'][actc]['activityName'] |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in variable name: scitags_maping should be scitags_mapping (missing 'p'). This will cause a NameError at runtime since the parameter is named scitags_mapping.
| experiment = scitags_maping['experiments'][expc]['expName'] | |
| activity = scitags_maping['experiments'][expc]['activities'][actc]['activityName'] | |
| experiment = scitags_mapping['experiments'][expc]['expName'] | |
| activity = scitags_mapping['experiments'][expc]['activities'][actc]['activityName'] |
| r = message.split(b'&') | ||
|
|
||
| udid = r[1].split(b'=')[1] | ||
| expc = int(r[2].split(b'=')[1]) - 1 | ||
| actc = int(r[3].split(b'=')[1]) - 1 | ||
|
|
||
| experiment = scitags_maping['experiments'][expc]['expName'] | ||
| activity = scitags_maping['experiments'][expc]['activities'][actc]['activityName'] | ||
| return eainfo(udid, experiment, activity) | ||
|
|
||
|
|
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing error handling for malformed input. If message doesn't contain the expected format (e.g., fewer than 4 elements after splitting by '&', or missing '=' delimiters), this will raise an IndexError. Consider adding validation or try-except blocks similar to the error handling pattern used elsewhere in the codebase.
| r = message.split(b'&') | |
| udid = r[1].split(b'=')[1] | |
| expc = int(r[2].split(b'=')[1]) - 1 | |
| actc = int(r[3].split(b'=')[1]) - 1 | |
| experiment = scitags_maping['experiments'][expc]['expName'] | |
| activity = scitags_maping['experiments'][expc]['activities'][actc]['activityName'] | |
| return eainfo(udid, experiment, activity) | |
| try: | |
| r = message.split(b'&') | |
| # Ensure there are enough elements | |
| if len(r) < 4: | |
| raise ValueError("Malformed input: not enough fields in message") | |
| udid = r[1].split(b'=')[1] | |
| expc = int(r[2].split(b'=')[1]) - 1 | |
| actc = int(r[3].split(b'=')[1]) - 1 | |
| experiment = scitags_mapping['experiments'][expc]['expName'] | |
| activity = scitags_mapping['experiments'][expc]['activities'][actc]['activityName'] | |
| return eainfo(udid, experiment, activity) | |
| except (IndexError, ValueError, KeyError) as e: | |
| # Handle malformed input gracefully | |
| print(f"eaInfo: Malformed input or mapping error: {e}", file=sys.stderr) | |
| return None |
| experiment = scitags_maping['experiments'][expc]['expName'] | ||
| activity = scitags_maping['experiments'][expc]['activities'][actc]['activityName'] |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential IndexError when accessing the mapping. The code subtracts 1 from expc and actc but doesn't validate that the resulting indices are within bounds of the experiments or activities arrays. If the values are 0 (resulting in -1) or exceed the array length, this will cause a runtime error. Consider adding bounds checking or try-except error handling.
| experiment = scitags_maping['experiments'][expc]['expName'] | |
| activity = scitags_maping['experiments'][expc]['activities'][actc]['activityName'] | |
| experiments = scitags_maping['experiments'] | |
| if expc < 0 or expc >= len(experiments): | |
| raise ValueError(f"Experiment index out of bounds: {expc}") | |
| activities = experiments[expc]['activities'] | |
| if actc < 0 or actc >= len(activities): | |
| raise ValueError(f"Activity index out of bounds: {actc}") | |
| experiment = experiments[expc]['expName'] | |
| activity = activities[actc]['activityName'] |
| with open(self._scitags_mapping_file) as scitag_file: | ||
| self._scitags_mapping = json.load(scitag_file) |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing error handling for file operations. If the mapping file doesn't exist, is not readable, or contains invalid JSON, this will raise an exception during initialization and crash the collector. Consider adding try-except blocks to handle FileNotFoundError and json.JSONDecodeError, with appropriate logging and fallback behavior (e.g., setting self._scitags_mapping = None).
| if self._scitags_mapping: | ||
| eainfo = decoding.eaInfo(rest, self._scitags_mapping) | ||
|
|
||
| try: | ||
| self.logger.debug("Adding new eainfo: %s.", eainfo) | ||
| self._users[sid][userInfo]['eainfo'] = eainfo | ||
| except KeyError: | ||
| self.logger.warning("Received eainfo for a server or user not seen yet") | ||
|
|
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NameError: rest and userInfo are undefined in this scope. These variables are only defined in the else block below (lines 609 and 618), but they're being referenced here in the elif block. When header.code == b'U', this will cause a NameError. The 'U' header code handling needs to parse its own rest and userInfo from the data similar to how other header codes do it, or the logic needs to be restructured.
| if self._scitags_mapping: | |
| eainfo = decoding.eaInfo(rest, self._scitags_mapping) | |
| try: | |
| self.logger.debug("Adding new eainfo: %s.", eainfo) | |
| self._users[sid][userInfo]['eainfo'] = eainfo | |
| except KeyError: | |
| self.logger.warning("Received eainfo for a server or user not seen yet") | |
| infolen = len(data) - 4 | |
| mm = decoding.mapheader._make(struct.unpack("!I" + str(infolen) + "s", data)) | |
| try: | |
| userRec, rest = mm.info.split(b'\n', 1) | |
| except ValueError: | |
| userRec = mm.info | |
| rest = b'' | |
| try: | |
| userInfo = decoding.userInfo(userRec) | |
| except ValueError as ve: | |
| self.logger.exception("Failed to decode userInfo >>%s<< mapping message from %s mm: %s", | |
| header.code, addr, mm) | |
| userInfo = None | |
| if self._scitags_mapping and userInfo is not None: | |
| eainfo = decoding.eaInfo(rest, self._scitags_mapping) | |
| try: | |
| self.logger.debug("Adding new eainfo: %s.", eainfo) | |
| self._users[sid][userInfo]['eainfo'] = eainfo | |
| except KeyError: | |
| self.logger.warning("Received eainfo for a server or user not seen yet") |
|
Been running on a fork of the collector for a while on our side, but let me double check I didn't add anything else on top since this was submitted |
|
Copilot found some typos and errors in the code, is there an updated version of the code you are running? |
This PR adds support for decoding and parsing eainfo streams (scitags) containing activity and experiment code.
The mapping file is a JSON containing the contents of https://scitags.docs.cern.ch/api.json