diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f429d4a..44edcbe 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,5 +1,11 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v1.3.0 + rev: v5.0.0 + hooks: + - id: check-yaml + - id: end-of-file-fixer + - id: trailing-whitespace +- repo: https://github.com/pycqa/flake8 + rev: 7.1.1 hooks: - id: flake8 diff --git a/README.md b/README.md index 91ff96d..0666183 100644 --- a/README.md +++ b/README.md @@ -47,19 +47,18 @@ cp settings_example.py settings.py Set up your environment. ```bash -pip3 install virtualenv -virtualenv venv3 +python3 -m venv env source venv3/bin/activate pip install -r requirements.txt ``` Run the script. ```bash -python pull_alerts.py +python pull_alerts.py --include-low ``` ## Timezone By default, local timezone is used to print alert time. To change default timezone use `TZ` environment variable: ```bash TZ=UTC python pull_alerts.py -``` \ No newline at end of file +``` diff --git a/pull_alerts.py b/pull_alerts.py index b773b8b..08100a5 100644 --- a/pull_alerts.py +++ b/pull_alerts.py @@ -1,113 +1,115 @@ -from __future__ import absolute_import -from __future__ import division, print_function, unicode_literals +# Copy this file into pull_alerts.py in from https://github.com/lyft/opsreview +# To include low_urgency call it like this: python pull_alerts.py --include-low + +from __future__ import (absolute_import, division, print_function, + unicode_literals) import argparse import logging -import urllib -from collections import Counter, OrderedDict, defaultdict, namedtuple +from collections import defaultdict from datetime import datetime, timedelta -from dateutil import tz -import dateutil.parser +import dateutil.parser import pygerduty.v2 -from prettytable import PrettyTable - -try: - import settings -except ImportError: - print("*** Error: Follow setup instructions in README.md to create settings.py") - raise SystemExit(1) +from dateutil import relativedelta, tz +import settings logger = logging.getLogger(__name__) pagerduty_service = pygerduty.v2.PagerDuty(settings.PAGERDUTY_API_TOKEN) LOCAL_TZ = tz.tzlocal() -Tag = namedtuple("Tag", ["tag", "display_name"]) -TAGS = [ - Tag(tag="#a", display_name="Actionable (#a)"), - Tag(tag="#na", display_name="Non Actionable (#na)"), - Tag(tag="#t", display_name="Transient (#t)"), - Tag(tag="#s", display_name="Seasonal (#s)"), - Tag(tag="#abot", display_name="Actionable By Other Team (#abot)"), -] class FormattedIncident(object): def pretty_output(self): return u'Time: {}\nService: {}\nDescription: {}\nURL: {}\nNotes:\n{}\n'.format( - self.created_on.strftime('%A, %B %-d - %-I:%M %p %z'), + self.formatted_created_at, self.service, self.description, self.url, self.notes, ) + @property + def is_high_urgency(self): + return not (self.urgency == 'low' or '-low-' in self.service) + + @property + def formatted_created_at(self): + return self.created_on.strftime('%a, %b %-d - %-I:%M %p') -def recent_incidents_for_services(services, time_window): + +def recent_incidents_for_services(services): service_ids = [service.id for service in services] - try: - recent_incidents = list(pagerduty_service.incidents.list( - service_ids=service_ids, - since=datetime.now(tz=LOCAL_TZ) - time_window - )) - return recent_incidents - - except urllib.error.HTTPError as e: - if e.reason == 'URI Too Long': - mid_point = int(len(services)/2) - return recent_incidents_for_services( - services[:mid_point], - time_window, - ) + recent_incidents_for_services( - services[mid_point:], - time_window, - ) - raise + on_call_start = get_oncall_start() + on_call_end = on_call_start + timedelta(days=8) + recent_incidents = list(pagerduty_service.incidents.list( + service_ids=service_ids, + since=on_call_start, + until=on_call_end + )) + return recent_incidents + + +def get_oncall_start(): + # oncall starts on Wednesday 12PM + # get last Wed but not today if today is a Wed + today = datetime.now(tz=tz.tzlocal()) + today = today.replace(hour=12, minute=0, second=0, microsecond=0) + if today.weekday() == 2: + on_call_start = today + relativedelta.relativedelta(days=-1, weekday=relativedelta.WE(-1)) + else: + on_call_start = today + relativedelta.relativedelta(weekday=relativedelta.WE(-1)) + + return on_call_start def print_all_incidents( - silent, - time_window_days, - group_by_description=False, - group_by_service=False, - include_stats=False, - include_incidents_as_blockquote=False, + include_low ): services = [] for escalation_policy in settings.ESCALATION_POLICIES: services.extend(list(pagerduty_service.escalation_policies.show(escalation_policy).services)) - recent_incidents = recent_incidents_for_services(services, timedelta(days=time_window_days)) - formatted_incidents = get_formatted_incidents(recent_incidents) + recent_incidents = recent_incidents_for_services(services) + all_incidents = get_formatted_incidents(recent_incidents) + high_urg_incidents = [i for i in all_incidents if i.is_high_urgency] + low_urg_incidents = [i for i in all_incidents if not i.is_high_urgency] + print('\n########## High Urgency Pages ##########') + print_pages_by_description(high_urg_incidents) + if include_low: + print('\n########## Low Urgency Pages ##########') + print_pages_by_description(low_urg_incidents) - all_incidents, sorted_description_to_incident_list, sorted_service_to_incident_list = sort_incidents( - formatted_incidents, - group_by_description, - group_by_service - ) - print_stats(all_incidents, include_stats) - if include_incidents_as_blockquote: - print("""# Raw incident log -``` -""") - if group_by_service: - sorted_group_to_incident_list = sorted_service_to_incident_list - elif group_by_description: - sorted_group_to_incident_list = sorted_description_to_incident_list - if group_by_service or group_by_description: - for group, incident_list in sorted_group_to_incident_list.items(): - print("########### {}: {} ##########\n".format(len(incident_list), group)) - if not silent: - for incident in incident_list: - print(incident.pretty_output()) - else: - for incident in all_incidents: - print(incident.pretty_output()) + print_stats(high_urg_incidents, low_urg_incidents) print('Total Pages: {}'.format(len(all_incidents))) - if include_incidents_as_blockquote: - print("```") + + +def print_pages_by_notes(incidents): + note_to_incident_list = defaultdict(list) + for incident in incidents: + note_to_incident_list[incident.last_note].append(incident) + + for note, incidents in note_to_incident_list.items(): + print('\n{} generated {} incidents:'.format(note, len(incidents))) + for i in incidents: + print('\t- {} ({})'.format(i.description, i.url)) + + +def print_pages_by_description(incidents): + desc_to_incident_list = defaultdict(list) + for incident in incidents: + desc_to_incident_list[incident.description].append(incident) + + for desc, incidents in desc_to_incident_list.items(): + print('\n**{}** [Paged {} times]:'.format(desc, len(incidents))) + for i in incidents: + if i.last_note == 'NO NOTE': + print('- [alarm paged]({}) - no note'.format(i.url)) + else: + print('- [alarm paged]({}) - {}'.format(i.url, i.last_note)) def get_formatted_incidents(recent_incidents): @@ -116,6 +118,7 @@ def get_formatted_incidents(recent_incidents): formatted_incident = FormattedIncident() formatted_incident.service = incident.service.summary formatted_incident.url = incident.html_url + formatted_incident.urgency = incident.urgency if hasattr(incident, 'title'): formatted_incident.description = incident.title elif hasattr(incident, 'summary'): @@ -131,107 +134,70 @@ def get_formatted_incidents(recent_incidents): for note in notes: formatted_notes.append(u'{}: {}'.format(note.user.summary, note.content)) formatted_incident.notes = formatted_notes + formatted_incident.last_note = formatted_notes[-1] if formatted_notes else 'NO NOTE' formatted_incidents.append(formatted_incident) return formatted_incidents -def _tag_incident(incident, tag_stats): - tagged = False - for tag in TAGS: - found_tag = any(tag.tag in note for note in incident.notes) - if not found_tag: - continue - tagged = True - tag_stats[tag] += 1 - return tagged +def print_stats(high_urg_incidents, low_urg_incidents): + h_a, h_na, h_t, h_nt = get_breakdown(high_urg_incidents) + l_a, l_na, l_t, l_nt = get_breakdown(low_urg_incidents) + oncall_start = get_oncall_start() + oncall_end = oncall_start + timedelta(days=7) + formatted_start = oncall_start.strftime('%m/%d %H:%M') + formatted_end = oncall_end.strftime('%m/%d %H:%M') + print("""\n# Statistics from {} to {} +| Incidents | High Urgency | Low Urgency | +| -------------------- | ------------ | ----------- | +| Actionable (#a) | {:12} | {:11} | +| Non Actionable (#na) | {:12} | {:11} | +| Transient (#t) | {:12} | {:11} | +| Not Tagged | {:12} | {:11} | +| TOTAL | {:12} | {:11} | +""".format( + formatted_start, formatted_end, h_a, l_a, h_na, l_na, h_t, l_t, h_nt, l_nt, + len(high_urg_incidents), len(low_urg_incidents) + )) -def print_stats(all_incidents, include_stats): - if not include_stats: - return +def get_breakdown(incidents): + actionable = 0 + non_actionable = 0 + transient = 0 + not_tagged = 0 + for i in incidents: + if is_actionable(i): + actionable += 1 + elif is_non_actionable(i): + non_actionable += 1 + elif is_transient(i): + transient += 1 + else: + not_tagged += 1 + return actionable, non_actionable, transient, not_tagged - stats_table = PrettyTable() - stats_table.field_names = ["Incidents", "Number"] - stats_table.align["Incidents"] = "l" - stats_table.align["Number"] = "r" - tag_stats = Counter() +def is_actionable(incident): + return any('#a' in note for note in incident.notes) - not_tagged = 0 - for i in all_incidents: - tagged = _tag_incident(i, tag_stats) - not_tagged += not tagged - - for tag in TAGS: - stats_table.add_row([tag.display_name, tag_stats[tag]]) - stats_table.add_row(["Not Tagged", not_tagged]) - stats_table.add_row(["Total", len(all_incidents)]) - - print(stats_table) - - -def sort_incidents(all_incidents, group_by_description, group_by_service): - description_to_incident_list = defaultdict(list) - service_to_incident_list = defaultdict(list) - for incident in all_incidents: - description_to_incident_list[incident.description].append(incident) - for incident in all_incidents: - service_to_incident_list[incident.service].append(incident) - # Sort by desc count - sorted_description_to_incident_list = OrderedDict(sorted( - description_to_incident_list.items(), - key=lambda x: len(x[1]), - reverse=True - )) - sorted_service_to_incident_list = OrderedDict(sorted( - service_to_incident_list.items(), - key=lambda x: len(x[1]), - reverse=True - )) - if group_by_description: - all_incidents = [] - for incident_list in sorted_description_to_incident_list.values(): - all_incidents += incident_list - else: - all_incidents = sorted(all_incidents, key=lambda i: i.created_on) - return all_incidents, sorted_description_to_incident_list, sorted_service_to_incident_list +def is_non_actionable(incident): + return any('#na' in note for note in incident.notes) + + +def is_transient(incident): + return any('#t' in note for note in incident.notes) if __name__ == '__main__': logging.basicConfig() parser = argparse.ArgumentParser() - parser.add_argument("--silent", - action="store_true", - default=False, - help="Do not print each description") - parser.add_argument("--group-by-description", - action="store_true", - default=False, - help="Group PD incidents by description") - parser.add_argument("--group-by-service", - action="store_true", - default=False, - help="Group PD incidents by service") - parser.add_argument("--include-stats", - action="store_true", - default=False, - help="Include incidents stats") - parser.add_argument("--include-incidents-as-blockquote", + parser.add_argument("--include-low", action="store_true", default=False, - help="Include raw incident log as markdown blockquote") - parser.add_argument('--days', - type=int, - default=7, - help='time window days') + help="Include low urgency detailed view") args = parser.parse_args() print_all_incidents( - silent=args.silent, - group_by_description=args.group_by_description, - group_by_service=args.group_by_service, - include_stats=args.include_stats, - include_incidents_as_blockquote=args.include_incidents_as_blockquote, - time_window_days=args.days + include_low=args.include_low )