Source code for py2neo.pep249

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

# Copyright 2011-2021, Nigel Small
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
This module provides an implementation of the Python Database API v2.0
(as specified in PEP 249) for Neo4j. The classes here are thin wrappers
over the regular py2neo functionality, so will provide a very similar
behaviour and experience to the rest of the library.

To get started, create a :class:`Connection` object using the
:func:`connect` function::

>>> from py2neo.pep249 import connect
>>> con = connect()

The arguments accepted by this function are identical to those accepted
by the :class:`.Graph` class in the core API. Therefore, a URI and any
combination of individual settings may be specified.

The :class:`Connection` object represents a single
client-server connection and can maintain a single transaction at any
point in time. Transactions are implicitly created through using a
method like :meth:`Cursor.execute` or can be explicitly
created using the :meth:`Connection.begin` method.

"""


from datetime import date, time, datetime
from time import localtime

from six import raise_from

from py2neo import Neo4jError
from py2neo.client import (Connection as _Connection,
                           ConnectionProfile as _ConnectionProfile)
from py2neo.errors import ConnectionUnavailable as _ConnectionUnavailable


apilevel = "2.0"
threadsafety = 0        # TODO
paramstyle = "cypher"   # TODO


def _date_from_ticks(ticks):
    return Date(*localtime(ticks)[:3])


def _time_from_ticks(ticks):
    return Time(*localtime(ticks)[3:6])


def _timestamp_from_ticks(ticks):
    return Timestamp(*localtime(ticks)[:6])


Date = date
Time = time
Timestamp = datetime
DateFromTicks = _date_from_ticks
TimeFromTicks = _time_from_ticks
TimestampFromTicks = _timestamp_from_ticks
Binary = memoryview


# noinspection PyShadowingBuiltins
[docs]class Warning(Exception): """ Exception raised for important warnings like data truncations while inserting, etc. """
[docs]class Error(Exception): """ Exception that is the base class of all other error exceptions. You can use this to catch all errors with one single except statement. Warnings are not considered errors and thus should not use this class as base. """
[docs]class InterfaceError(Error): """ Exception raised for errors that are related to the database interface rather than the database itself. """
[docs]class DatabaseError(Error): """ Exception raised for errors that are related to the database. """
[docs]class DataError(DatabaseError): """ Exception raised for errors that are due to problems with the processed data like division by zero, numeric value out of range, etc. """
[docs]class OperationalError(DatabaseError): """ Exception raised for errors that are related to the database's operation and not necessarily under the control of the programmer, e.g. an unexpected disconnect occurs, the data source name is not found, a transaction could not be processed, a memory allocation error occurred during processing, etc. """
[docs]class IntegrityError(DatabaseError): """ Exception raised when the relational integrity of the database is affected, e.g. a foreign key check fails. """
[docs]class InternalError(DatabaseError): """ Exception raised when the database encounters an internal error, e.g. the cursor is not valid anymore, the transaction is out of sync, etc. """
[docs]class ProgrammingError(DatabaseError): """ Exception raised for programming errors, e.g. table not found or already exists, syntax error in the SQL statement, wrong number of parameters specified, etc. """
[docs]class NotSupportedError(DatabaseError): """ Exception raised in case a method or database API was used which is not supported by the database, e.g. requesting a .rollback() on a connection that does not support transaction or has transactions turned off. """
[docs]class Connection(object): """ PEP249-compliant connection to a Neo4j server. """ Warning = Warning Error = Error InterfaceError = InterfaceError DatabaseError = DatabaseError DataError = DataError OperationalError = OperationalError IntegrityError = IntegrityError InternalError = InternalError ProgrammingError = ProgrammingError NotSupportedError = NotSupportedError def __init__(self, profile=None, **settings): profile = _ConnectionProfile(profile, **settings) try: self._cx = _Connection.open(profile) except _ConnectionUnavailable as error: raise_from(self.OperationalError("Connection unavailable"), error) self._tx = None self._db = None def __check__(self): if self._cx is None or self._cx.closed: raise self.ProgrammingError("Connection is closed") if self._cx.broken: raise self.OperationalError("Connection is broken") def __execute__(self, query, parameters=None): try: result = self._cx.run(self._tx, query, parameters) self._cx.pull(result) except Neo4jError as error: self._tx = None raise_from(self.OperationalError("Failed to execute query"), error) else: return result
[docs] def begin(self): """ Begin a transaction. :raises ProgrammingError: if the connection is closed :raises OperationalError: if the connection is broken or if the transaction fails to begin """ self.rollback() try: self._tx = self._cx.begin(self._db) except Neo4jError as error: raise_from(self.OperationalError("Failed to begin transaction"), error)
[docs] def commit(self): """ Commit any pending transaction to the database. :raises ProgrammingError: if the connection is closed :raises OperationalError: if the connection is broken or if the transaction fails to commit """ self.__check__() if self._tx is not None: try: self._cx.commit(self._tx) except Neo4jError as error: raise_from(self.OperationalError("Failed to commit transaction"), error) finally: self._tx = None
[docs] def rollback(self): """ Rollback any pending transaction. :raises ProgrammingError: if the connection is closed :raises OperationalError: if the connection is broken or if the transaction fails to rollback """ self.__check__() if self._tx is not None: try: self._cx.rollback(self._tx) except Neo4jError as error: raise_from(self.OperationalError("Failed to rollback transaction"), error) finally: self._tx = None
@property def in_transaction(self): """ True if a transaction is active, False otherwise. """ return self._tx is not None
[docs] def cursor(self): """ Construct a new :class:`.Cursor` object for this connection. """ self.__check__() return Cursor(self)
[docs] def execute(self, query, parameters=None): """ Execute a query on this connection. """ self.__check__() cursor = self.cursor() cursor.execute(query, parameters) return cursor
[docs] def executemany(self, query, seq_of_parameters): """ Execute a query on this connection once for each parameter set. """ self.__check__() cursor = self.cursor() cursor.executemany(query, seq_of_parameters)
[docs] def close(self): """ Close the connection immediately. The connection will be unusable from this point forward; a :class:`.ProgrammingError` exception will be raised if any operation is attempted with the connection. The same applies to all cursor objects trying to use the connection. Note that closing a connection without committing the changes first will cause an implicit rollback to be performed. """ if self._cx is not None and not self._cx.closed and not self._cx.broken: if self._tx is not None: try: self._cx.rollback(self._tx) except Neo4jError: pass finally: self._tx = None self._cx.close() self._cx = None
[docs]class Cursor(object): """ PEP249-compliant cursor attached to a Neo4j server. """ arraysize = 1 def __init__(self, connection): self._connection = connection self._result = None self._closed = False def __iter__(self): self.__check__() if self._result is None: return while True: record = self._result.take() if record is None: break yield tuple(record) def __check__(self): if self._closed: raise self._connection.ProgrammingError("Cursor is closed") self.connection.__check__() @property def connection(self): """ Connection to which this cursor is bound. """ return self._connection @property def description(self): """ Field details, each represented as a 7-tuple. """ if self._result is None: return None return [(name, None, None, None, None, None, None) for name in self._result.fields()] @property def rowcount(self): """ Number of rows affected by the last query executed. """ return -1 @property def summary(self): """ Dictionary of summary information relating to the last query executed. """ if self._result is None: return None return self._result.summary()
[docs] def close(self): """ Close the cursor immediately. """ self._closed = True
[docs] def execute(self, query, parameters=None): """ Execute a query. :raises ProgrammingError: if the cursor or the connection is closed :raises OperationalError: if the connection is broken """ self.__check__() if not self.connection.in_transaction: self.connection.begin() self._result = self.connection.__execute__(query, parameters) return self
[docs] def executemany(self, query, seq_of_parameters): """ Execute query multiple times with different parameter sets. :raises ProgrammingError: if the cursor or the connection is closed :raises OperationalError: if the connection is broken """ self.__check__() for parameters in seq_of_parameters: self.execute(query, parameters)
[docs] def fetchone(self): """ Fetch the next record, if available. :returns: record tuple or :py:const:`None` :raises ProgrammingError: if the cursor or the connection is closed :raises OperationalError: if the connection is broken """ self.__check__() if self._result is None: return None record = self._result.take() if record is None: return None return tuple(record)
[docs] def fetchmany(self, size=None): """ Fetch up to `size` records. :param size: :returns: list of record tuples :raises ProgrammingError: if the cursor or the connection is closed :raises OperationalError: if the connection is broken """ self.__check__() if self._result is None: return [] if size is None: size = self.arraysize records = [] for _ in range(size): record = self._result.take() if record is None: break records.append(tuple(record)) return records
[docs] def fetchall(self): """ Fetch all remaining records. :returns: list of record tuples :raises ProgrammingError: if the cursor or the connection is closed :raises OperationalError: if the connection is broken """ self.__check__() if self._result is None: return [] records = [] while True: record = self._result.take() if record is None: break records.append(tuple(record)) return records
def setinputsizes(self, sizes): pass def setoutputsize(self, size, column=None): pass
[docs]def connect(profile=None, **settings): """ Constructor for creating a connection to the database. """ return Connection(profile, **settings)