|
| 1 | +from kafka.producer import KafkaProducer |
| 2 | +import sys |
| 3 | +import os |
| 4 | +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))) |
| 5 | + |
| 6 | +from lib.commonsplunk import check_events_from_splunk |
| 7 | +from lib.commonkafka import * |
| 8 | +from lib.helper import * |
| 9 | +from datetime import datetime |
| 10 | +import threading |
| 11 | +import logging.config |
| 12 | +import yaml |
| 13 | +import subprocess |
| 14 | +import logging |
| 15 | +import time |
| 16 | + |
| 17 | +logging.config.fileConfig(os.path.join(get_test_folder(), "logging.conf")) |
| 18 | +logger = logging.getLogger('connector_upgrade') |
| 19 | + |
| 20 | +_config_path = os.path.join(get_test_folder(), 'config.yaml') |
| 21 | +with open(_config_path, 'r') as yaml_file: |
| 22 | + config = yaml.load(yaml_file) |
| 23 | +now = datetime.now() |
| 24 | +_time_stamp = str(datetime.timestamp(now)) |
| 25 | + |
| 26 | + |
| 27 | +def start_old_connector(): |
| 28 | + cmds = ["test -f {0}/{1} && echo {0}/{1}".format(config["connector_path"], config["old_connector_name"]), |
| 29 | + "cd {}".format(config["kafka_home"]), |
| 30 | + "sudo ./bin/connect-distributed.sh {}/config/connect-distributed-quickstart.properties &". |
| 31 | + format(config["kafka_connect_home"])] |
| 32 | + |
| 33 | + cmd = "\n".join(cmds) |
| 34 | + try: |
| 35 | + proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, |
| 36 | + stderr=subprocess.STDOUT) |
| 37 | + proc.wait() |
| 38 | + except OSError as e: |
| 39 | + logger.error(e) |
| 40 | + |
| 41 | + |
| 42 | +def generate_kafka_events(num): |
| 43 | + # Generate message data |
| 44 | + topics = ["kafka_data_gen"] |
| 45 | + connector_content = { |
| 46 | + "name": "kafka_connect", |
| 47 | + "config": { |
| 48 | + "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector", |
| 49 | + "tasks.max": "1", |
| 50 | + "splunk.indexes": config["splunk_index"], |
| 51 | + "topics": "kafka_data_gen", |
| 52 | + "splunk.hec.ack.enabled": "false", |
| 53 | + "splunk.hec.uri": config["splunk_hec_url"], |
| 54 | + "splunk.hec.ssl.validate.certs": "false", |
| 55 | + "splunk.hec.token": config["splunk_token"] |
| 56 | + } |
| 57 | + } |
| 58 | + create_kafka_connector(config, connector_content) |
| 59 | + create_kafka_topics(config, topics) |
| 60 | + producer = KafkaProducer(bootstrap_servers=config["kafka_broker_url"], |
| 61 | + value_serializer=lambda v: json.dumps(v).encode('utf-8')) |
| 62 | + |
| 63 | + for _ in range(num): |
| 64 | + msg = {"timestamp": _time_stamp} |
| 65 | + producer.send("kafka_data_gen", msg) |
| 66 | + time.sleep(0.05) |
| 67 | + producer.flush() |
| 68 | + |
| 69 | + |
| 70 | +def upgrade_connector(): |
| 71 | + cmds = ["sudo kill $(sudo lsof -t -i:8083) && sleep 2", |
| 72 | + "sudo rm {}/{} && sleep 2".format(config["connector_path"], config["old_connector_name"]), |
| 73 | + "sudo cp {0}/splunk-kafka-connect*.jar {1} && sleep 2".format(config["connector_build_target"], |
| 74 | + config["connector_path"]), |
| 75 | + "cd {}".format(config["kafka_home"]), |
| 76 | + "sudo ./bin/connect-distributed.sh {}/config/connect-distributed-quickstart.properties &". |
| 77 | + format(config["kafka_connect_home"])] |
| 78 | + |
| 79 | + cmd = "\n".join(cmds) |
| 80 | + try: |
| 81 | + proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, |
| 82 | + stderr=subprocess.STDOUT) |
| 83 | + output, error = proc.communicate() |
| 84 | + logger.info(output) |
| 85 | + except OSError as e: |
| 86 | + logger.error(e) |
| 87 | + |
| 88 | + |
| 89 | +if __name__ == '__main__': |
| 90 | + logger.info("Start old Kafka connector ...") |
| 91 | + thread_old_connect = threading.Thread(target=start_old_connector, daemon=True) |
| 92 | + thread_old_connect.start() |
| 93 | + time.sleep(10) |
| 94 | + logger.info("Generate Kafka events ...") |
| 95 | + thread_gen = threading.Thread(target=generate_kafka_events, args=(2000,), daemon=True) |
| 96 | + thread_gen.start() |
| 97 | + time.sleep(50) |
| 98 | + logger.info("Upgrade Kafka connector ...") |
| 99 | + thread_upgrade = threading.Thread(target=upgrade_connector, daemon=True) |
| 100 | + thread_upgrade.start() |
| 101 | + time.sleep(100) |
| 102 | + search_query = "index={0} | search timestamp=\"{1}\"".format(config['splunk_index'], _time_stamp) |
| 103 | + logger.info(search_query) |
| 104 | + events = check_events_from_splunk(start_time="-15m@m", |
| 105 | + url=config["splunkd_url"], |
| 106 | + user=config["splunk_user"], |
| 107 | + query=["search {}".format(search_query)], |
| 108 | + password=config["splunk_password"]) |
| 109 | + logger.info("Splunk received %s events in the last 15m", len(events)) |
| 110 | + assert len(events) == 2000 |
0 commit comments