Source code for unicorn_binance_local_depth_cache.manager

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# File: unicorn_binance_local_depth_cache/manager.py
#
# Part of ‘UNICORN Binance Local Depth Cache’
# Project website: https://www.lucit.tech/unicorn-binance-local-depth-cache.html
# Github: https://github.com/LUCIT-Systems-and-Development/unicorn-binance-local-depth-cache
# Documentation: https://unicorn-binance-local-depth-cache.docs.lucit.tech
# PyPI: https://pypi.org/project/unicorn-binance-local-depth-cache
# LUCIT Online Shop: https://shop.lucit.services/software
#
# License: LSOSL - LUCIT Synergetic Open Source License
# https://github.com/LUCIT-Systems-and-Development/unicorn-binance-local-depth-cache/blob/master/LICENSE
#
# Author: LUCIT Systems and Development
#
# Copyright (c) 2022-2023, LUCIT Systems and Development (https://www.lucit.tech)
# All rights reserved.

from .exceptions import DepthCacheOutOfSync
from operator import itemgetter
from lucit_licensing_python.manager import LucitLicensingManager
from lucit_licensing_python.exceptions import NoValidatedLucitLicense
from unicorn_binance_rest_api.manager import BinanceRestApiManager
from unicorn_binance_rest_api.exceptions import BinanceAPIException
from unicorn_binance_websocket_api.manager import BinanceWebSocketApiManager
from typing import Optional, Union
import cython
import copy
import logging
import platform
import requests
import time
import threading

__app_name__: str = "unicorn-binance-local-depth-cache"
__version__: str = "1.0.0.dev"

logger = logging.getLogger("unicorn_binance_local_depth_cache")


[docs] class BinanceLocalDepthCacheManager(threading.Thread): """ A local Binance DepthCache Manager for Python that supports multiple depth caches in one instance in an easy, fast, flexible, robust and fully-featured way. Binance API documentation: https://developers.binance.com/docs/binance-api/spot-detail/web-socket-streams#diff-depth-stream https://binance-docs.github.io/apidocs/futures/en/#diff-book-depth-streams :param exchange: Select binance.com, binance.com-testnet, binance.com-margin, binance.com-margin-testnet, binance.com-isolated_margin, binance.com-isolated_margin-testnet, binance.com-futures, binance.com-futures-testnet, binance.com-coin_futures, binance.us, trbinance.com, jex.com, binance.org or binance.org-testnet (default: binance.com) :type exchange: str :param default_refresh_interval: The default refresh interval in seconds, default is None. :type default_refresh_interval: int :param default_update_interval: Update speed of the depth webstream in milliseconds. More info: https://github.com/LUCIT-Systems-and-Development/unicorn-binance-local-depth-cache/wiki/update_intervals This can be overwritten with `update_interval` of `create_depth_cache()`. :type default_update_interval: int :param default_websocket_close_timeout: The `close_timeout` parameter defines a maximum wait time in seconds for completing the closing handshake and terminating the TCP connection. This parameter is passed through to the `websockets.client.connect() <https://websockets.readthedocs.io/en/stable/topics/design.html?highlight=close_timeout#closing-handshake>`_ :type default_websocket_close_timeout: int :param default_websocket_ping_interval: Once the connection is open, a `Ping frame` is sent every `ping_interval` seconds. This serves as a keepalive. It helps keeping the connection open, especially in the presence of proxies with short timeouts on inactive connections. Set `ping_interval` to `None` to disable this behavior. This parameter is passed through to the `websockets.client.connect() <https://websockets.readthedocs.io/en/stable/topics/timeouts.html?highlight=ping_interval#keepalive-in-websock ets>`_ :type default_websocket_ping_interval: int :param default_websocket_ping_timeout: If the corresponding `Pong frame` isn't received within `ping_timeout` seconds, the connection is considered unusable and is closed with code 1011. This ensures that the remote endpoint remains responsive. Set `ping_timeout` to `None` to disable this behavior. This parameter is passed through to the `websockets.client.connect() <https://websockets.readthedocs.io/en/stable/topics/timeouts.html?highlight=ping_interval#keepalive-in-websockets>`_ :type default_websocket_ping_timeout: int :param disable_colorama: set to True to disable the use of `colorama <https://pypi.org/project/colorama/>`_ :type disable_colorama: bool :param warn_on_update: set to `False` to disable the update warning :type warn_on_update: bool :param lucit_api_secret: The `api_secret` of your UNICORN Binance Suite license from https://shop.lucit.services/software/unicorn-binance-suite :type lucit_api_secret: str :param lucit_license_ini: Specify the path including filename to the config file (ex: `~/license_a.ini`). If not provided lucitlicmgr tries to load a `lucit_license.ini` from `/home/oliver/.lucit/`. :type lucit_license_ini: str :param lucit_license_profile: The license profile to use. Default is 'LUCIT'. :type lucit_license_profile: str :param lucit_license_token: The `license_token` of your UNICORN Binance Suite license from https://shop.lucit.services/software/unicorn-binance-suite :type lucit_license_token: str :param ubra_manager: Provide a shared unicorn_binance_rest_api.manager instance :type ubra_manager: BinanceRestApiManager :param ubwa_manager: Provide a shared unicorn_binance_websocket_api.manager instance. Use `enable_stream_signal_buffer=True <https://unicorn-binance-websocket-api.docs.lucit.tech/unicorn_binance_websocket_api.html?highlight=enable_stream_signal_buffer%20true#module-unicorn_binance_websocket_api.manager>`_ otherwise the depth_cache will not work as it should! :type ubwa_manager: BinanceWebSocketApiManager """ def __init__(self, exchange: str = "binance.com", default_refresh_interval: int = None, default_update_interval: int = None, default_websocket_close_timeout: int = 2, default_websocket_ping_interval: int = 5, default_websocket_ping_timeout: int = 10, disable_colorama: bool = False, lucit_api_secret: str = None, lucit_license_ini: str = None, lucit_license_profile: str = None, lucit_license_token: str = None, ubra_manager: Optional[BinanceRestApiManager] = None, ubwa_manager: Optional[BinanceWebSocketApiManager] = None, warn_on_update: bool = True): super().__init__() self.name = __app_name__ self.version = __version__ logger.info(f"New instance of {self.get_user_agent()}-{'compiled' if cython.compiled else 'source'} on " f"{str(platform.system())} {str(platform.release())} for exchange {exchange} started ...") self.exchange = exchange self.depth_caches = {} self.default_update_interval = default_update_interval self.default_refresh_interval = default_refresh_interval self.default_websocket_close_timeout = default_websocket_close_timeout self.default_websocket_ping_interval = default_websocket_ping_interval self.default_websocket_ping_timeout = default_websocket_ping_timeout self.disable_colorama = disable_colorama self.last_update_check_github = {'timestamp': time.time(), 'status': None} self.lucit_api_secret = lucit_api_secret self.lucit_license_ini = lucit_license_ini self.lucit_license_profile = lucit_license_profile self.lucit_license_token = lucit_license_token self.ubra = ubra_manager self.ubwa = ubwa_manager self.llm = LucitLicensingManager(api_secret=self.lucit_api_secret, license_ini=self.lucit_license_ini, license_profile=self.lucit_license_profile, license_token=self.lucit_license_token, parent_shutdown_function=self.stop_manager, program_used=self.name, needed_license_type="UNICORN-BINANCE-SUITE", start=True) licensing_exception = self.llm.get_license_exception() if licensing_exception is not None: raise NoValidatedLucitLicense(licensing_exception) try: self.ubra = ubra_manager or BinanceRestApiManager(exchange=self.exchange, disable_colorama=disable_colorama, warn_on_update=warn_on_update, lucit_api_secret=self.lucit_api_secret, lucit_license_ini=self.lucit_license_ini, lucit_license_profile=self.lucit_license_profile, lucit_license_token=self.lucit_license_token) except requests.exceptions.ConnectionError as error_msg: error_msg = f"Can not initialize BinanceLocalDepthCacheManager() - No internet connection? - {error_msg}" logger.critical(error_msg) raise ConnectionRefusedError(error_msg) if ubwa_manager: if not ubwa_manager.is_stream_signal_buffer_enabled(): error_msg = f"The shared `ubwa_manager` must use `enable_stream_signal_buffer=True` otherwise the " \ f"depth_cache will not work as it should! \r\n More info: " \ f"https://unicorn-binance-websocket-api.docs.lucit.tech/unicorn_binance_websocket_api." \ f"html?highlight=enable_stream_signal_buffer%20true#module-unicorn_binance_websocket_api" \ f".manager" logger.critical(error_msg) raise RuntimeWarning(error_msg) self.ubwa = ubwa_manager or BinanceWebSocketApiManager(exchange=self.exchange, enable_stream_signal_buffer=True, disable_colorama=True, high_performance=True, warn_on_update = warn_on_update, lucit_api_secret=self.lucit_api_secret, lucit_license_ini=self.lucit_license_ini, lucit_license_profile=self.lucit_license_profile, lucit_license_token=self.lucit_license_token) self.stop_request = False self.threading_lock_ask = {} self.threading_lock_bid = {} if warn_on_update and self.is_update_available(): update_msg = (f"Release {self.name}_{self.get_latest_version()} is available, please consider updating! " f"(Changelog: https://unicorn-binance-local-depth-cache.docs.lucit.tech/changelog.html)") print(update_msg) logger.warning(update_msg) self.thread_stream_signals = threading.Thread(target=self._process_stream_signals) self.thread_stream_signals.start() def __enter__(self): logger.debug(f"Entering 'with-context' ...") return self def __exit__(self, exc_type, exc_value, error_traceback): logger.debug(f"Leaving 'with-context' ...") self.stop_manager() if exc_type: logger.critical(f"An exception occurred: {exc_type} - {exc_value} - {error_traceback}") def _add_depth_cache(self, market: str = None, stream_id: str = None, refresh_interval: int = None) -> bool: """ Add a depth_cache to the depth_caches stack. :param market: Specify the market for the used depth_cache :type market: str :param stream_id: Provide a stream_id :type stream_id: str :param refresh_interval: The refresh interval in seconds, default is None. :type refresh_interval: int :return: bool """ if market is not None: market = market.lower() if market and stream_id: self.depth_caches[market] = {'asks': {}, 'bids': {}, 'is_synchronized': False, 'last_refresh_time': None, 'last_update_id': None, 'refresh_interval': refresh_interval or self.default_refresh_interval, 'refresh_request': False, 'stop_request': False, 'stream_id': stream_id, 'stream_status': None, 'market': market, 'thread': None, 'thread_is_started': False} self.threading_lock_ask[market] = threading.Lock() self.threading_lock_bid[market] = threading.Lock() logger.debug(f"BinanceLocalDepthCacheManager._add_depth_cache() - Added new entry for market" f" {market} and stream_id {stream_id}") return True else: logger.critical(f"BinanceLocalDepthCacheManager._add_depth_cache() - Not able to add entry for market" f" {market} and stream_id {stream_id}") return False def _add_ask(self, ask, market: str = None) -> bool: """ Add, update or delete an ask of a specific depth_cache. :param ask: Add asks to the depth_cache :type ask: list :param market: Specify the market for the used depth_cache :type market: str :return: bool """ if market is not None: market = market.lower() with self.threading_lock_ask[market]: self.depth_caches[market]['asks'][ask[0]] = float(ask[1]) if ask[1] == "0.00000000" or ask[1] == "0.000": logger.debug(f"BinanceLocalDepthCacheManager._add_ask() - Deleting depth position {ask[0]} on ask " f"side for market {market}") del self.depth_caches[market]['asks'][ask[0]] return True def _add_bid(self, bid, market: str = None) -> bool: """ Add a bid to a specific depth_cache. :param bid: Add bids to the depth_cache :type bid: list :param market: Specify the market symbol for the used depth_cache :type market: str :return: bool """ if market is not None: market = market.lower() with self.threading_lock_bid[market]: self.depth_caches[market]['bids'][bid[0]] = float(bid[1]) if bid[1] == "0.00000000" or bid[1] == "0.000": logger.debug(f"BinanceLocalDepthCacheManager._add_bid() - Deleting depth position {bid[0]} on bid " f"side for market {market}") del self.depth_caches[market]['bids'][bid[0]] return True def _apply_updates(self, order_book: dict = None, market: str = None) -> bool: """ Apply updates to a specific depth_cache :param order_book: Provide order_book data from rest or ws :type order_book: dict :param market: Specify the market symbol for the used depth_cache :type market: str :return: bool """ if market is not None: market = market.lower() if order_book is None: return False logger.debug(f"BinanceLocalDepthCacheManager._apply_updates() - Applying updates to the depth_cache with " f"market {market}") for ask in order_book.get('a', []) + order_book.get('asks', []): self._add_ask(ask, market=market) for bid in order_book.get('b', []) + order_book.get('bids', []): self._add_bid(bid, market=market) return True def _get_order_book_from_depth_cache(self, market: str = None) -> Optional[Union[dict, None]]: """ Get the order_book of the chosen market. :param market: Specify the market symbol for the used depth_cache :type market: str :return: dict or None """ if market is not None: market = market.lower() try: if self.exchange == "binance.com" or self.exchange == "binance.com-testnet": try: order_book = self.ubra.get_order_book(symbol=market.upper(), limit=1000) except BinanceAPIException as error_msg: logger.error(f"BinanceLocalDepthCacheManager._init_depth_cache() - Can not download order_book " f"snapshot for the depth_cache with market {market} - BinanceAPIException - " f"error_msg: {error_msg}") return None elif self.exchange == "binance.com-futures": try: order_book = self.ubra.futures_order_book(symbol=market.upper(), limit=1000) except BinanceAPIException as error_msg: logger.error(f"BinanceLocalDepthCacheManager._init_depth_cache() - Can not download order_book " f"snapshot for the depth_cache with market {market} - BinanceAPIException - " f"error_msg: {error_msg}") return None else: return None except requests.exceptions.ConnectionError as error_msg: logger.error(f"BinanceLocalDepthCacheManager._init_depth_cache() - Can not download order_book snapshot " f"for the depth_cache with market {market} - requests.exceptions.ConnectionError - " f"error_msg: {error_msg}") return None except requests.exceptions.ReadTimeout as error_msg: logger.error(f"BinanceLocalDepthCacheManager._init_depth_cache() - Can not download order_book snapshot " f"for the depth_cache with market {market} - requests.exceptions.ReadTimeout - " f"error_msg: {error_msg}") return None logger.debug(f"BinanceLocalDepthCacheManager._init_depth_cache() - Downloaded order_book snapshot for " f"the depth_cache with market {market}") return order_book def _init_depth_cache(self, market: str = None) -> bool: """ Initialise the depth_cache with a rest snapshot. :param market: Specify the market symbol for the used depth_cache :type market: str :return: bool """ if market is not None: market = market.lower() logger.info(f"BinanceLocalDepthCacheManager._init_depth_cache() - Starting initialization of the cache " f"with market {market}") order_book = self._get_order_book_from_depth_cache(market=market) if order_book is False: logger.info(f"BinanceLocalDepthCacheManager._init_depth_cache() - Can not get order_book of the cache " f"with market {market}") return False self._reset_depth_cache(market=market) self.depth_caches[market]['last_refresh_time'] = int(time.time()) self.depth_caches[market]['last_update_time'] = int(time.time()) try: self.depth_caches[market]['last_update_id'] = int(order_book['lastUpdateId']) except TypeError as error_msg: logger.error(f"BinanceLocalDepthCacheManager._init_depth_cache() - TypeError - error_msg: {error_msg}") return False self._apply_updates(order_book, market=market) for bid in order_book['bids']: self._add_bid(bid, market=market) for ask in order_book['asks']: self._add_ask(ask, market=market) logger.debug(f"BinanceLocalDepthCacheManager._init_depth_cache() - Finished initialization of the cache " f"with market {market}") return True def _process_stream_data(self, market: str = None) -> None: """ Process depth stream_data The logic is described here: - Binance Spot: https://developers.binance.com/docs/binance-api/spot-detail/web-socket-streams#how-to-manage-a-local-order-book-correctly - Binance Futures: https://binance-docs.github.io/apidocs/futures/en/#diff-book-depth-streams :param market: Specify the market symbol for the used depth_cache :type market: str :return: None """ if market is not None: market = market.lower() logger.debug(f"BinanceLocalDepthCacheManager._process_stream_data() - Started thread for stream_data of " f"market {market}") self.depth_caches[market]['thread_is_started'] = True while self.is_stop_request(market=market) is False: logger.debug(f"BinanceLocalDepthCacheManager._process_stream_data() - Clearing stream_buffer with stream_id" f" {self.depth_caches[market]['stream_id']} of the " f"cache of market {market} (stream_buffer length: " f"{self.ubwa.get_stream_buffer_length(self.depth_caches[market]['stream_id'])}") self.depth_caches[market]['is_synchronized'] = False self.depth_caches[market]['refresh_request'] = False self.ubwa.clear_stream_buffer(self.depth_caches[market]['stream_id']) logger.debug(f"BinanceLocalDepthCacheManager._process_stream_data() - Cleared stream_buffer: " f"{self.ubwa.get_stream_buffer_length(self.depth_caches[market]['stream_id'])} items") while self.ubwa.get_stream_buffer_length(self.depth_caches[market]['stream_id']) <= 2 and \ self.is_stop_request(market=market) is False: # Proceeding as soon as the first update event is received. On new websockets the first received message # is a "'result': None", so way wait for the second incoming message. logger.debug(f"BinanceLocalDepthCacheManager._process_stream_data() - Waiting for enough depth " f"events for depth_cache with market {market}") time.sleep(0.1) logger.debug(f"BinanceLocalDepthCacheManager._process_stream_data() - Collected enough depth events, " f"starting the initialization of the cache with market {market}") if not self._init_depth_cache(market=market): logger.error(f"BinanceLocalDepthCacheManager._process_stream_data() - Not able to initiate depth_cache " f"with market {market}") continue while self.is_stop_request(market=market) is False: if self.depth_caches[market]['refresh_request'] is True: self.depth_caches[market]['is_synchronized'] = False logger.info(f"BinanceLocalDepthCacheManager._process_stream_data() - Caught refresh_request " f"for depth_cache with market {market}") break stream_data = self.ubwa.pop_stream_data_from_stream_buffer(self.depth_caches[market]['stream_id']) if stream_data and "'result': None" not in str(stream_data): if self.depth_caches[market]['is_synchronized'] is False: if self.exchange == "binance.com" or self.exchange == "binance.com-testnet": if int(stream_data['data']['u']) <= self.depth_caches[market]['last_update_id']: # Drop it logger.debug(f"BinanceLocalDepthCacheManager._process_stream_data() - Dropping " f"outdated depth update of the cache with market {market}") continue if int(stream_data['data']['U']) <= self.depth_caches[market]['last_update_id']+1 \ <= int(stream_data['data']['u']): # The first processed event should have U <= lastUpdateId+1 AND u >= lastUpdateId+1. self._apply_updates(stream_data['data'], market=market) logger.info(f"BinanceLocalDepthCacheManager._process_stream_data() - Finished " f"initialization of the cache with market {market}") # Init (refresh) finished self.depth_caches[market]['is_synchronized'] = True self.depth_caches[market]['last_refresh_time'] = int(time.time()) elif self.exchange == "binance.com-futures": if int(stream_data['data']['u']) < self.depth_caches[market]['last_update_id']: # Drop it logger.debug(f"BinanceLocalDepthCacheManager._process_stream_data() - Dropping " f"outdated depth update of the cache with market {market}") continue if int(stream_data['data']['U']) <= self.depth_caches[market]['last_update_id'] \ <= int(stream_data['data']['u']): # The first processed event should have U <= lastUpdateId AND u >= lastUpdateId self._apply_updates(stream_data['data'], market=market) logger.info(f"BinanceLocalDepthCacheManager._process_stream_data() - Finished " f"initialization of the cache with market {market}") # Init (refresh) finished self.depth_caches[market]['is_synchronized'] = True self.depth_caches[market]['last_refresh_time'] = int(time.time()) else: # Regular depth update events if self.exchange == "binance.com" or self.exchange == "binance.com-testnet": if stream_data['data']['U'] != self.depth_caches[market]['last_update_id']+1: logger.error(f"BinanceLocalDepthCacheManager._process_stream_data() - There is a " f"gap between the last and the penultimate update ID, the " f"depth_cache `{market}` is no longer correct and must be " f"reinitialized") break elif self.exchange == "binance.com-futures": if stream_data['data']['pu'] != self.depth_caches[market]['last_update_id']: logger.error(f"BinanceLocalDepthCacheManager._process_stream_data() - There is a " f"gap between the last and the penultimate update ID, the depth_cache " f"`{market}` is no longer correct and must be reinitialized") break if self.depth_caches[market]['refresh_interval'] is not None: if self.depth_caches[market]['last_refresh_time'] < int(time.time()) - \ self.depth_caches[market]['refresh_interval']: logger.info(f"BinanceLocalDepthCacheManager._process_stream_data() - The refresh " f"interval has been exceeded, start new initialization for depth_cache " f"`{market}`") break logger.debug(f"BinanceLocalDepthCacheManager._process_stream_data() - Applying regular " f"depth update to the depth_cache with market {market} - update_id: " f"{stream_data['data']['U']} - {stream_data['data']['u']}") self._apply_updates(stream_data['data'], market=market) self.depth_caches[market]['last_update_id'] = stream_data['data']['u'] self.depth_caches[market]['last_update_time'] = int(time.time()) else: time.sleep(0.001) # Exiting ... del self.depth_caches[market] del self.threading_lock_ask[market] del self.threading_lock_bid[market] logger.info(f"BinanceLocalDepthCacheManager._process_stream_data() - depth_cache `{market}` was " f"stopped and cleared") def _process_stream_signals(self) -> None: """ Process stream_signals :return: None """ logger.debug(f"BinanceLocalDepthCacheManager._process_stream_signals() - Started thread for stream_signals") while self.is_stop_request() is False: stream_signal = self.ubwa.pop_stream_signal_from_stream_signal_buffer() if stream_signal: logger.debug(f"BinanceLocalDepthCacheManager._process_stream_signals() - received stream_signal: " f"{stream_signal}") for market in self.depth_caches: if self.depth_caches[market]['stream_id'] == stream_signal['stream_id']: if stream_signal['type'] == "DISCONNECT": logger.debug(f"BinanceLocalDepthCacheManager._process_stream_signals() - Setting " f"stream_status of depth_cache with market {market} to `DISCONNECT") self.depth_caches[market]['is_synchronized'] = False self.depth_caches[market]['stream_status'] = "DISCONNECT" self.ubwa.clear_stream_buffer(self.depth_caches[market]['stream_id']) self.depth_caches[market]['refresh_request'] = True elif stream_signal['type'] == "FIRST_RECEIVED_DATA": logger.debug(f"BinanceLocalDepthCacheManager._process_stream_signals() - Setting " f"stream_status of depth_cache with market {market} to `RUNNING") self.depth_caches[market]['stream_status'] = "RUNNING" else: logger.debug(f"BinanceLocalDepthCacheManager._process_stream_signals() - Setting " f"stream_status of depth_cache with market {market} to " f"`{stream_signal['type']}") self.depth_caches[market]['stream_status'] = stream_signal['type'] else: time.sleep(0.1) def _reset_depth_cache(self, market: str = None) -> bool: """ Reset a depth_cache (delete all asks and bids) :param market: Specify the market symbol for the used depth_cache :type market: str :return: bool """ if market is not None: market = market.lower() logger.debug(f"BinanceLocalDepthCacheManager._reset_depth_cache() - deleting all bids and ask of depth_cache " f"with market {market}") with self.threading_lock_ask[market]: self.depth_caches[market]['asks'] = {} with self.threading_lock_bid[market]: self.depth_caches[market]['bids'] = {} return True @staticmethod def _sort_depth_cache(items, reverse=False) -> list: """ Sort asks or bids by price :param items: asks or bids :type items: dict :param reverse: False is regular, True is reversed :type reverse: bool :return: list """ logger.debug(f"BinanceLocalDepthCacheManager._sort_depth_cache() - Start sorting") new_items = [[float(price), float(quantity)] for price, quantity in items.items()] new_items = sorted(new_items, key=itemgetter(0), reverse=reverse) return new_items
[docs] def create_depth_cache(self, markets: Optional[Union[str, list]] = None, update_interval: Optional[int] = None, refresh_interval: int = None, websocket_close_timeout: int = None, websocket_ping_interval: int = None, websocket_ping_timeout: int = None) -> bool: """ Create one or more depth_cache! :param markets: Specify the market symbols for caches to be created :type markets: str or list :param update_interval: Update speed of the depth webstream in milliseconds. More info: https://github.com/LUCIT-Systems-and-Development/unicorn-binance-local-depth-cache/wiki/update_intervals :type update_interval: int :param refresh_interval: The refresh interval in seconds, default is the `default_refresh_interval` of `BinanceLocalDepthCache <https://unicorn-binance-local-depth-cache.docs.lucit.tech/unicorn_binance_local_depth_cache.html?highlight=default_refresh_interval#unicorn_binance_local_depth_cache.manager.BinanceLocalDepthCacheManager>`_. :type refresh_interval: int :param websocket_close_timeout: The `close_timeout` parameter defines a maximum wait time in seconds for completing the closing handshake and terminating the TCP connection. This parameter is passed through to the `websockets.client.connect() <https://websockets.readthedocs.io/en/stable/topics/design.html?highlight=close_timeout#closing-handshake>`_ :type websocket_close_timeout: int :param websocket_ping_interval: Once the connection is open, a `Ping frame` is sent every `ping_interval` seconds. This serves as a keepalive. It helps keeping the connection open, especially in the presence of proxies with short timeouts on inactive connections. Set `ping_interval` to `None` to disable this behavior. This parameter is passed through to the `websockets.client.connect() <https://websockets.readthedocs.io/en/stable/topics/timeouts.html?highlight=ping_interval#keepalive-in-websockets>`_ :type websocket_ping_interval: int :param websocket_ping_timeout: If the corresponding `Pong frame` isn't received within `ping_timeout` seconds, the connection is considered unusable and is closed with code 1011. This ensures that the remote endpoint remains responsive. Set `ping_timeout` to `None` to disable this behavior. This parameter is passed through to the `websockets.client.connect() <https://websockets.readthedocs.io/en/stable/topics/timeouts.html?highlight=ping_interval#keepalive-in-websockets>`_ :type websocket_ping_timeout: int :return: bool """ if markets is None: return False if isinstance(markets, str): markets = [markets, ] for market in markets: market = market.lower() try: if self.depth_caches[market]: logger.warning(f"BinanceLocalDepthCacheManager.create_depth_cache() - depth_cache " f"{market} already exists!") return True except KeyError: logger.debug(f"BinanceLocalDepthCacheManager.create_depth_cache() - No existing depth_cache for " f"market {market} found!") update_interval = update_interval or self.default_update_interval if update_interval is None: channel = f"depth" else: channel = f"depth@{update_interval}ms" stream_id = self.ubwa.create_stream(channel, market, stream_buffer_name=True, stream_label=f"ubldc_{market}", output="dict", close_timeout=websocket_close_timeout or self.default_websocket_close_timeout, ping_timeout=websocket_ping_interval or self.default_websocket_ping_interval, ping_interval=websocket_ping_timeout or self.default_websocket_ping_timeout) self._add_depth_cache(market=market, stream_id=stream_id, refresh_interval=refresh_interval) self.depth_caches[market]['thread'] = threading.Thread(target=self._process_stream_data, args=(market,)) self.depth_caches[market]['thread'].start() while self.depth_caches[market]['thread_is_started'] is False: # This is to await the creation of the thread to avoid errors if the main thread gets closed before. # This can happen if after calling `create_depth_cache()` the main thread has no more code and exits. logger.debug(f"BinanceLocalDepthCacheManager.create_depth_cache() - Waiting till thread for " f"market {market} is started") time.sleep(0.01) return True
[docs] def get_asks(self, market: str = None) -> list: """ Get the current list of asks with price and quantity. :param market: Specify the market symbol for the used depth_cache :type market: str :return: list """ if market is not None: market = market.lower() if self.depth_caches[market]['is_synchronized'] is False: try: raise DepthCacheOutOfSync(f"The depth_cache for market symbol '{market}' is out of sync, " f"please try again later") except KeyError: raise KeyError(f"Invalid value provided: market={market}") if market: with self.threading_lock_ask[market]: return self._sort_depth_cache(self.depth_caches[market]['asks'], reverse=False) else: raise KeyError(f"Missing parameter `market`")
[docs] def get_bids(self, market: str = None) -> list: """ Get the current list of bids with price and quantity. :param market: Specify the market symbol for the used depth_cache :type market: str :return: list """ if market is not None: market = market.lower() try: if self.depth_caches[market]['is_synchronized'] is False: raise DepthCacheOutOfSync(f"The depth_cache for market symbol '{market}' is out of sync, " f"please try again later") except KeyError: raise KeyError(f"Invalid value provided: market={market}") if market: with self.threading_lock_bid[market]: return self._sort_depth_cache(self.depth_caches[market]['bids'], reverse=True) else: raise KeyError(f"Missing parameter `market`")
[docs] @staticmethod def get_latest_release_info() -> Optional[Union[dict, None]]: """ Get info about the latest available release :return: dict or None """ logger.debug(f"BinanceLocalDepthCacheManager.get_latest_release_info() - Starting the request") try: respond = requests.get(f"https://api.github.com/repos/LUCIT-Systems-and-Development/" f"unicorn-binance-local-depth-cache/releases/latest") latest_release_info = respond.json() return latest_release_info except Exception: return None
[docs] def get_latest_version(self) -> str: """ Get the version of the latest available release (cache time 1 hour) :return: str """ logger.debug(f"BinanceLocalDepthCacheManager.get_latest_version() - Starting the request") # Do a fresh request if status is None or last timestamp is older 1 hour if self.last_update_check_github['status'] is None or \ (self.last_update_check_github['timestamp'] + (60 * 60) < time.time()): self.last_update_check_github['status'] = self.get_latest_release_info() if self.last_update_check_github['status']: try: return self.last_update_check_github['status']['tag_name'] except KeyError: return "unknown" else: return "unknown"
[docs] def get_list_of_depth_caches(self) -> list: """ Get a list of existing depth caches :return: list """ logger.debug(f"BinanceLocalDepthCacheManager.get_list_of_depth_caches() - Create and then return the list") depth_cache_list = [] for depth_cache in self.depth_caches: depth_cache_list.append(depth_cache) return depth_cache_list
[docs] def get_user_agent(self): """ Get the user_agent string "lib name + lib version + python version" :return: """ user_agent = f"{self.name}_{str(self.get_version())}-python_{str(platform.python_version())}" return user_agent
[docs] def is_depth_cache_synchronized(self, market: str = None) -> bool: """ Is a specific depth_cache synchronized? :param market: Specify the market symbol for the used depth_cache :type market: str :return: bool """ if market is not None: market = market.lower() logger.debug(f"BinanceLocalDepthCacheManager.is_depth_cache_synchronized() - Returning the status") return self.depth_caches[market]['is_synchronized']
[docs] def is_stop_request(self, market: str = None) -> bool: """ Is there a stop request? :param market: Specify the market symbol for the used depth_cache :type market: str :return: bool """ if market is not None: market = market.lower() logger.debug(f"BinanceLocalDepthCacheManager.is_stop_request() - Returning the status for market {market}") if market is None: if self.stop_request is False: return False else: return True else: if self.stop_request is False and self.depth_caches[market]['stop_request'] is False: return False else: return True
[docs] def is_update_available(self) -> bool: """ Is a new release of this package available? :return: bool """ logger.debug(f"BinanceLocalDepthCacheManager.is_update_available() - Starting the request") installed_version = self.get_version() if ".dev" in installed_version: installed_version = installed_version[:-4] if self.get_latest_version() == installed_version: return False elif self.get_latest_version() == "unknown": return False else: return True
[docs] def get_version(self) -> str: """ Get the package/module version :return: str """ logger.debug(f"BinanceLocalDepthCacheManager.get_version() - Returning the version") return self.version
[docs] def print_summary(self, add_string=None): """ Print an overview of all streams :param add_string: text to add to the output :type add_string: str """ self.ubwa.print_summary(add_string=add_string, title=self.get_user_agent())
[docs] def set_refresh_request(self, markets: Optional[Union[str, list]] = None) -> bool: """ Set refresh requests for one or more depth_caches! :param markets: Specify the market symbols for the depth_caches to be refreshed :type markets: str or list :return: bool """ if markets is None: logger.critical(f"BinanceLocalDepthCacheManager.set_refresh_request() - Please provide a market") return False if isinstance(markets, str): markets = [markets, ] for market in markets: market = market.lower() logger.info(f"BinanceLocalDepthCacheManager.set_refresh_request() - Set refresh request for " f"depth_cache {market}") self.depth_caches[market]['refresh_request'] = True return True
[docs] def stop_depth_cache(self, markets: Optional[Union[str, list]] = None) -> bool: """ Stop and delete one or more depth_caches! :param markets: Specify the market symbols for the depth_caches to be stopped and deleted :type markets: str or list :return: bool """ if markets is None: logger.critical(f"BinanceLocalDepthCacheManager.stop_depth_cache() - Please provide a market") return False if isinstance(markets, str): markets = [markets, ] for market in markets: market = market.lower() logger.info(f"BinanceLocalDepthCacheManager.stop_depth_cache() - Setting stop_request for " f"depth_cache {market}, stop its stream and clear the stream_buffer") stream_id = copy.deepcopy(self.depth_caches[market]['stream_id']) self.depth_caches[market]['stop_request'] = True self.ubwa.stop_stream(stream_id=stream_id) time.sleep(10) self.ubwa.clear_stream_buffer(stream_buffer_name=stream_id) return True
[docs] def stop_manager(self, close_api_session: bool = True) -> bool: """ Stop unicorn-binance-local-depth-cache with all sub routines :return: bool """ logger.debug(f"BinanceLocalDepthCacheManager.stop_manager() - Stop initiated!") self.stop_request = True if self.ubwa is not None: self.ubwa.stop_manager() if self.ubra is not None: self.ubra.stop_manager() # close lucit license manger and the api session if close_api_session is True: self.llm.close() return True
[docs] def stop_manager_with_all_depth_caches(self) -> bool: """ Alias of `stop_manager()`. :return: bool """ logger.debug(f"BinanceLocalDepthCacheManager.stop_manager_with_all_depth_caches() - Stop initiated!") return self.stop_manager()