Source code for gmql.managers

from .FileManagment.DependencyManager import DependencyManager
from .FileManagment import get_user_dir
from .RemoteConnection.SessionManager import load_sessions, store_sessions
from .FileManagment import TempFileManager
from .settings import get_remote_address, get_configuration, get_master, get_local_java_options,  get_spark_configs
from .configuration import Configuration
import py4j
from subprocess import Popen, PIPE
from py4j.java_gateway import JavaGateway, launch_gateway, GatewayParameters, \
    OutputConsumer, ProcessConsumer, quiet_close
import os
import time
import atexit
import signal
import warnings
import logging
import re
from findspark import find

__remote_manager = None
__session_manager = None
__dependency_manager = None
__source_table = None
__gateway = None
__pythonManager = None
__gmql_jar_path = None
__py4j_path = None


def start():
    global __pythonManager, __gateway, __dependency_manager, __gmql_jar_path, __py4j_path
    logger = logging.getLogger()
    master = get_master()

    if master.lower().startswith('local'):
        logger.debug("Starting LOCAL backend (master: {})".format(master.lower()))
        java_home = os.environ.get("JAVA_HOME")
        if java_home is None:
            raise SystemError("The environment variable JAVA_HOME is not set")
        java_path = os.path.join(java_home, "bin", "java")
        _port = launch_gateway(classpath=__gmql_jar_path, die_on_exit=True,
                               java_path=java_path, javaopts=get_local_java_options(),
                               jarpath=__py4j_path)
        __gateway = JavaGateway(gateway_parameters=GatewayParameters(port=_port,
                                                                     auto_convert=True))
        python_api_package = get_python_api_package(__gateway)
        __pythonManager = start_gmql_manager(python_api_package)

        conf = get_configuration()
        conf.set_master(master.lower())
        _set_spark_configuration(conf)
        _set_system_configuration(conf)
    else:
        # use spark-submit
        logger.debug("Submitting backend to {}".format(master))
        master = re.sub("^spark_", "", master.lower())
        configs = get_spark_configs()
        spark_location = find()
        logger.debug("Found spark at location: {}".format(spark_location))
        command = [os.path.join(spark_location, 'bin', 'spark-submit'), '--master', master, '--deploy-mode', "client"]

        for cname, c in configs.items():
            command.extend(['--conf', '{}={}'.format(cname, c)])

        command.append(__gmql_jar_path)

        stderr = open(os.devnull, "w")
        proc = Popen(command, stdout=PIPE, stdin=PIPE, stderr=stderr)

        while True:
            try:
                _port = int(proc.stdout.readline())
                break
            except ValueError:
                pass

        logger.debug("Backend listening at port {}".format(_port))
        redirect_stdout = open(os.devnull, "w")
        OutputConsumer(redirect_stdout, proc.stdout, daemon=True).start()
        ProcessConsumer(proc, [redirect_stdout], daemon=True).start()
        quiet_close(stderr)

        __gateway = JavaGateway(gateway_parameters=GatewayParameters(port=_port,
                                                                     auto_convert=True))
        pm = __gateway.entry_point.getPythonManager()
        pm.startEngine()
        __pythonManager = pm


def _set_spark_configuration(conf):
    if not isinstance(conf, Configuration):
        raise TypeError("Configuration is required. {} was passed".format(type(conf)))
    pmg = get_python_manager()
    pmg.setSparkConfiguration(conf.app_name,
                              conf.master,
                              conf.get_spark_confs())


def _set_system_configuration(conf):
    if not isinstance(conf, Configuration):
        raise TypeError("Configuration is required. {} was passed".format(type(conf)))
    pmg = get_python_manager()
    pmg.setSystemConfiguration(conf.get_system_confs())


def __check_py4j_backend():
    py4j_version = py4j.__version__
    py4j_backend_jar = os.path.join(get_user_dir(), "py4j-{}.jar".format(py4j_version))
    if not os.path.isfile(py4j_backend_jar):
        py4j_location = DependencyManager.find_package(
                            repo="https://oss.sonatype.org/content/repositories/releases/",
                            repo_name="releases",
                            groupId="net.sf.py4j",
                            artifactId="py4j",
                            version=py4j_version,
                        )
        DependencyManager.download_from_location(py4j_location, py4j_backend_jar)
    return py4j_backend_jar


def set_backend_path(path):
    """ Manually set the scala backend of the library

    :param path: location of the jar file
    :return: None
    """
    global __gmql_jar_path
    if not is_backend_on():
        __gmql_jar_path = path


def set_py4j_path(path):
    """ Manually set the py4j backend of the library

    :param path: location of the jar file
    :return: None
    """
    global __py4j_path
    if not is_backend_on():
        __py4j_path = path


def stop():
    global __gateway, __session_manager, __remote_manager, __source_table
    # storing the session
    store_sessions(__session_manager.sessions)
    # killing the gateway
    if __gateway is not None:
        __gateway.shutdown()
    # removing remote files
    if __remote_manager is not None:
        remote_deletable = __source_table.get_deletable("remote")
        for rd in remote_deletable:
            __remote_manager.delete_dataset(rd)
    # flushing the tmp files
    TempFileManager.flush_everything()


def is_backend_on():
    global __pythonManager
    return __pythonManager is not None


def get_python_api_package(gateway):
    return gateway.jvm.it.polimi.genomics.pythonapi


def start_gmql_manager(python_api_package):
    pythonManager = python_api_package.PythonManager
    pythonManager.startEngine()
    return pythonManager


def get_gateway():
    global __gateway

    if __gateway is None:
        # Starting the GMQL manager
        start()
        return __gateway
    else:
        return __gateway


def get_python_manager():
    global __pythonManager

    if __pythonManager is None:
        # Starting the GMQL manager
        start()
        return __pythonManager
    else:
        return __pythonManager


def __initialize_source_table():
    global __source_table
    from .dataset.loaders.Sources import SourcesTable
    __source_table = SourcesTable()


def get_source_table():
    global __source_table
    return __source_table


def __initialize_dependency_manager():
    global __dependency_manager
    __dependency_manager = DependencyManager()


def __initialize_session_manager():
    global __session_manager
    __session_manager = load_sessions()


[docs]def login(): """ Enables the user to login to the remote GMQL service. If both username and password are None, the user will be connected as guest. """ from .RemoteConnection.RemoteManager import RemoteManager global __remote_manager, __session_manager logger = logging.getLogger() remote_address = get_remote_address() res = __session_manager.get_session(remote_address) if res is None: # there is no session for this address, let's login as guest warnings.warn("There is no active session for address {}. Logging as Guest user".format(remote_address)) rm = RemoteManager(address=remote_address) rm.login() session_type = "guest" else: # there is a previous session for this address, let's do an auto login # using that access token logger.info("Logging using stored authentication token") rm = RemoteManager(address=remote_address, auth_token=res[1]) # if the access token is not valid anymore (therefore we are in guest mode) # the auto_login function will perform a guest login from scratch session_type = rm.auto_login(how=res[2]) # store the new session __remote_manager = rm access_time = int(time.time()) auth_token = rm.auth_token __session_manager.add_session(remote_address, auth_token, access_time, session_type)
[docs]def logout(): """ The user can use this command to logout from the remote service :return: None """ global __remote_manager __remote_manager.logout()
def execute_remote(): global __remote_manager __remote_manager.execute_remote_all()
[docs]def get_remote_manager(): """ Returns the current remote manager :return: a RemoteManager """ global __remote_manager return __remote_manager
def get_session_manager(): """ Returns the session manager of the current instance of the library :return: a SessionManager """ global __session_manager return __session_manager def __initialize_logger(): log_fmt = '[PyGMQL] %(message)s' logging.basicConfig(level=logging.INFO, format=log_fmt) def __check_dependencies(): global __gmql_jar_path, __py4j_path if __gmql_jar_path is None: __gmql_jar_path = __dependency_manager.resolve_dependencies() if __py4j_path is None: __py4j_path = __check_py4j_backend() def init_managers(): __initialize_source_table() __initialize_session_manager() __initialize_dependency_manager() __initialize_logger() atexit.register(stop) signal.signal(signal.SIGINT, stop) __check_dependencies()