#!/usr/bin/env python -Wd """Licensed Materials - Property of Pipelinefx L.L.C. (C) COPYRIGHT Pipelinefx Limited Liability Corporation. All Rights Reserved. US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with Pipelinefx L.L.C. $DateTime$ $Change$ $File$ A script used to export the MySQL data of Qube supervisor versions 6.9-2 through 6.10-0a (tableversion v37) to .csv files, to be imported into the new PostgreSQL server of Qube 7.0 and above, using the accompanying script, import_data_into_pgsql.py. usage: export_data_from_mysql.py [-h] [-u USER] [-p PASSWORD] [-m MYSQL_DIR] [dump_root] Qube MySQL Data Exporter positional arguments: dump_root dump directory (default: qube_mysqldump) optional arguments: -h, --help show this help message and exit -u USER, --user USER DB user (default: root) -p PASSWORD, --password PASSWORD DB user password (default: None) -m MYSQL_DIR, --mysql_dir MYSQL_DIR mysql installation directory (default: /usr, or /usr/local/mysql, or C:/Program Files/pfx/qube/mysql) """ import subprocess, re, os, sys, platform, errno, logging import shutil from optparse import OptionParser # set up a global logger logger = logging.getLogger(__name__) logger.setLevel(10) sh = logging.StreamHandler() formatter = logging.Formatter('%(lineno)d:%(levelname)s: %(message)s') sh.setFormatter(formatter) logger.addHandler(sh) def parse_args(): # command line options and args parser = OptionParser(description = "Qube MySQL Data Exporter") parser.set_defaults( dump_root="qube_mysqldump", user='root', mysql_dir={ 'Windows': 'C:/Program Files/pfx/qube/mysql', 'Darwin': '/usr/local/mysql', 'Linux': '/usr' }[platform.system()], ) parser.add_option("--dump_root", help="dump directory (default: qube_mysqldump)") parser.add_option("-u", "--user", help="DB user (default: root)") parser.add_option("-p", "--password", help="DB user password") parser.add_option("-m", "--mysql_dir", help="mysql installation directory") opts, args = parser.parse_args() return opts # # main() # def main(): # parse the command line for options and args opts = parse_args(); dump_root = opts.dump_root dbuser = opts.user if opts.mysql_dir: mysqldir = opts.mysql_dir else: mysqldir = "/usr" if platform.system() == 'Darwin': mysqldir = "/usr/local/mysql" elif platform.system() == 'Windows': mysqldir = "C:/Program Files/pfx/qube/mysql" mysqlcmd = mysqldir + "/bin/mysql" mysqldump = mysqldir + "/bin/mysqldump" if platform.system() == 'Windows': mysqlcmd += '.exe' mysqldump += '.exe' mysqlcmd = os.path.normpath(mysqlcmd) mysqldump = os.path.normpath(mysqldump) if not (os.path.isfile(mysqlcmd) and os.path.isfile(mysqldump)): logger.fatal('Unable to locate executable mysql and/or mysqldump: is "mysql_dir" option correct?') logger.fatal('mysqldir = %s' % opts.mysql_dir) sys.exit(1) # first, check that we're dealing with the correct tableversion, which is # 37-- conversion to postgresql won't work otherwise required_table_version = 37 # get the tableversion from mysql (options: -N "skip column names", -s "silent") cmd = [mysqlcmd, '-u', dbuser, '-Ns', '-e', 'SELECT * FROM qube.tableversion'] if opts.password: cmd.insert(1, '--password=%s' % opts.password) logger.info('cmd: %s' % ' '.join(cmd)) try: # p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True); p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE); except OSError as e: logger.error("cannot run mysql command[%s] to get the qube mysql database table version-- %s" % (mysqlcmd, e)) raise out,err = p.communicate() if p.returncode != 0: logger.error("child returned non-zero-- %s" % (err)) raise OSError # detect the current MySQL Qube DB version try: dbver = int(out) except ValueError, e: logger.error("problem detecting Qube DB version (corrupt qube.tableversion table?). Aborting. -- %s" % e) raise if dbver != required_table_version: logger.error("Qube DB version mismatch-- found version[%d], but Conversion requires version[37] (from Qube 6.10). Aborting." % dbver) raise ValueError # fix the column ordering of the Qube tables, which may have gotten mixed # up if the system was upgraded cmd = mysqlcmd + ' -u '+ dbuser + ' qube < fix_mysql_column_orders.sql' logger.info('fixing Qube DB table column orders... this may take a while and may appear it has stoppped-- DO NOT INTERRUPT!') logger.info('cmd: %s' % cmd) try: p = subprocess.call(cmd, shell=True); except OSError as e: logger.error("cannot run mysql command[%s] to fix Qube mysql database table column orders-- %s" % (cmd, e)) raise if p != 0: logger.error("child returned non-zero") raise OSError # now get a list of all "Nqube" databases, and put it in "qube_dbs" p = subprocess.Popen([mysqlcmd, '-u', dbuser, '-Ns', '-e', 'show databases'], stdout=subprocess.PIPE, stderr=subprocess.PIPE); out,err = p.communicate() if p.returncode != 0: logger.error("cannot run mysql command[%s] to get a list of qube database name-- %s" % (mysqlcmd, err)) raise OSError lines = out.splitlines() qube_dbs = ['qube'] qube_db_name_re = re.compile('^\d+qube$') for line in lines: matchObj = qube_db_name_re.search(line) if matchObj: qube_dbs.append(line) # qube_dbs now contains a list of all MySQL "qube" databases # create or reset the dump_root dir if os.path.isdir(dump_root): logger.info('removing dump directory from a previous run...') shutil.rmtree(dump_root) try: os.mkdir(dump_root) except OSError as e: logger.error("cannot create the dump dir[%s]-- %s" % (dump_root, e.message)) raise # make sure dump_root is wide-open try: os.chmod(dump_root, 0777) except OSError as e: logger.error("cannot chmod the dump dir[%s]-- %s" % (dump_root, e.message)) raise # now call mysqldump to generate csv dump files for idx, db in enumerate(qube_dbs): dump_dir = dump_root + "/" + db logger.info("Dumping data from database[%s] to folder[%s] (%d/%d)" % (db, dump_dir, idx + 1, len(qube_dbs))) # first create a dir os.mkdir(dump_dir) os.chmod(dump_dir, 0777) # if db == "qube": if True: # we run mysqldump for "qube" db cmd = [mysqldump, '-u', dbuser, '--no-create-info', '--no-create-db', '--tab=' + dump_dir, '--fields-terminated-by=,', '--fields-enclosed-by="', db] if opts.password: cmd.insert(1, '--password=%s' % opts.password) logger.info('cmd: %s' % ' '.join(cmd)) p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE); out, err = p.communicate() if p.returncode != 0: logger.error("cannot run mysqldump command[%s] to dump the main qube database tables into dir[%s]-- %s" % (mysqldump, dump_dir, err)) raise OSError else: dump_Nqube_db(db, dump_dir) # post-process all *{callback,subjob,variable,work}.txt files in the Nqube # subdirectories and add the jobid as the first column in each row dirname_re = re.compile('^(\d+)qube$') tname_re = re.compile('^(\d+)(.*)\.txt') for dirpath, dirnames, filenames in os.walk(dump_root): if 'qube' in dirnames: dirnames.remove('qube') logger.info("Post-processing csv files in folder[%s]" % dirpath) for filename in filenames: filepath = os.path.join(dirpath, filename) if os.stat(filepath).st_size == 0: continue matchObj = tname_re.search(filename) if not matchObj: continue tname = matchObj.group(0) jobid = matchObj.group(1) ttype = matchObj.group(2) if ttype == "event": continue try: with open(filepath, 'r') as f: lines = f.read().splitlines() f.close() except OSError, e: logger.exception('Exception opening file: {0}'.format(e)) new_lines = [] in_escape = False re_escaped = re.compile(r'\\$') for line in lines: if not in_escape: new_lines.append('"%s",%s' % (jobid, line)) else: new_lines.append(line) if re_escaped.search(line): in_escape = True else: in_escape = False try: with open(filepath, 'w') as f: f.write("\n".join(new_lines)) f.write('\n') except OSError, e: logger.exception('Exception opening file: {0}'.format(e)) if __name__ == "__main__": main()