Module neosvrpy.client

This module define the client who will do the request to the NeosVR API.

Expand source code
"""
This module define the client who will do the request to the
NeosVR API.
"""

import dataclasses
import json
import logging
from datetime import datetime
from uuid import uuid4
from os import path as OSpath
from typing import Dict, List
from urllib.parse import ParseResult, urlparse

import dacite
from requests import Session
from requests import exceptions as requests_exceptions
from dateutil.parser import isoparse

from . import __version__
from .classes import (
    LoginDetails,
    NeosDirectory,
    NeosFriend,
    NeosLink,
    NeosRecord,
    NeosUser,
    NeosUserStatus,
    NeosMessage,
    NeosMessageType,
    neosMessageTypeMapping,
    NeosCloudVar,
    RecordType,
    recordTypeMapping,
    OnlineStatus,
    CurrentSessionAccessLevel,
    FriendStatus,
    OwnerType,
    NeosCloudVarDefs,
    Session as NeosSession,
)
from .utils import (
    nested_asdict_factory,
    getOwnerType,
)

from .endpoints import CLOUDX_NEOS_API
from neosvrpy import exceptions as neos_exceptions

DACITE_CONFIG = dacite.Config(
    cast=[
        NeosMessageType,
        RecordType,
        OnlineStatus,
        CurrentSessionAccessLevel,
        FriendStatus
    ],
    type_hooks={
        datetime: isoparse,
        ParseResult: urlparse,
    },
)

AUTHFILE_NAME = "auth.token"


@dataclasses.dataclass
class Client:
    """Representation of a neosvrpy NeosVR client."""
    userId: str = None
    token: str = None
    expire: datetime = None  # This don't seems to be use by the API.
    rememberMe: bool = False
    lastUpdate: datetime = None
    secretMachineId: str = None
    session: Session = Session()


    @property
    def headers(self) -> dict:
        default = {"User-Agent": f"neosvrpy/{__version__}"}
        if not self.userId or not self.token:
            logging.warning("WARNING: headers sections not set. this might throw an error soon...")
            return default
        default["Authorization"] = f"neos {self.userId}:{self.token}"
        return default

    @staticmethod
    def processRecordList(data: List[dict]):
        ret = []
        for raw_item in data:
            item = dacite.from_dict(NeosRecord, raw_item, DACITE_CONFIG)
            x = dacite.from_dict(recordTypeMapping[item.recordType], raw_item, DACITE_CONFIG)
            ret.append(x)
        return ret

    def _request(
            self, verb: str, path: str, data: dict = None, json: dict = None,
            params: dict = None, ignoreUpdate: bool = False
        ) -> Dict:
        if self.lastUpdate and not ignoreUpdate:
            lastUpdate = self.lastUpdate
            # From PolyLogix/CloudX.js, the token seems to expire after 3600000 seconds
            if (datetime.now() - lastUpdate).total_seconds() <= 3600000:
                self._request('patch', '/userSessions', ignoreUpdate=True)
                self.lastUpdate = datetime.now()
            # While the API dont seems to implement more security, official client behavior must be respected.
            # Only disconnect after 1 day of inactivity for now.
            # TODO: Implement disconnection after 1 week of inactivity when implementing the rememberMe feature.
            #if 64800 >= (datetime.now() - lastUpdate).total_seconds() >= 85536:
            #    self._request('patch', '/userSessions', ignoreUpdate=True)
            else:
                raise neos_exceptions.InvalidToken("Token expired")
        args = {'url': CLOUDX_NEOS_API + path}
        if data: args['data'] = data
        if json: args['json'] = json
        if params: args['params'] = params
        func = getattr(self.session, verb, None)
        with func(**args) as req:
            logging.debug("NeosAPI: [{}] {}".format(req.status_code, args))
            if req.status_code not in [200, 204]:
                if "Invalid credentials" in req.text:
                    raise neos_exceptions.InvalidCredentials(req.text)
                elif req.status_code == 403:
                    raise neos_exceptions.InvalidToken(req.headers)
                else:
                    raise neos_exceptions.NeosAPIException(req)
            if req.status_code == 200:
                try:
                    response = req.json()
                    if "message" in response:
                        raise neos_exceptions.NeosAPIException(req, message=response["message"])
                    return response
                except requests_exceptions.JSONDecodeError:
                    return req.text
            # In case of a 204 response
            return

    def login(self, data: LoginDetails) -> None:
        response = self._request('post', "/userSessions",
            json=dataclasses.asdict(data))
        self.userId = response["userId"]
        self.token = response["token"]
        self.secretMachineId = response["secretMachineId"]
        self.expire = isoparse(response["expire"])
        self.lastUpdate = datetime.now()
        self.session.headers.update(self.headers)

    def logout(self) -> None:
        self._request('delete',
            "/userSessions/{}/{}".format(self.userId, self.token),
            ignoreUpdate=True,
        )
        self.clean_session()

    def clean_session(self) -> None:
        self.userId = None
        self.token = None
        self.expire = None
        self.secretMachineId = None
        self.lastUpdate = None
        del self.session.headers["Authorization"]
        self.session.headers.update(self.headers)

    def loadToken(self):
        if OSpath.exists(AUTHFILE_NAME):
            with open(AUTHFILE_NAME, "r") as f:
                session = json.load(f)
                expire = datetime.fromisoformat(session["expire"])
                if datetime.now().timestamp() < expire.timestamp():
                    self.token = session["token"]
                    self.userId = session["userId"]
                    self.expire = expire
                    self.secretMachineId = session["secretMachineId"]
                    self.session.headers.update(self.headers)
                else:
                    raise neos_exceptions.NoTokenError
        else:
            raise neos_exceptions.NoTokenError

    def saveToken(self):
        with open(AUTHFILE_NAME, "w+") as f:
            json.dump(
                {
                    "userId": self.userId,
                    "expire": self.expire.isoformat(),
                    "token": self.token,
                    "secretMachineId": self.secretMachineId,
                },
                f,
            )

    def neosDBSignature(self, url: str) -> str:
        return url.split("//")[1].split(".")[0]

    def neosDbToHttp(self, iconUrl: str) -> str:
        url = "https://assets.neos.com/assets"
        url = url + self.neosDBSignature(iconUrl)
        return url

    def getUserData(self, user: str = None) -> NeosUser:
        if user is None:
            user = self.userId
        response = self._request('get', "/users/" + user)
        return dacite.from_dict(NeosUser, response, DACITE_CONFIG)

    def getSession(self, session_id: str) -> NeosSession:
        """ Return session information.
        """
        response = self._request('get', f'/sessions/{session_id}')
        return dacite.from_dict(NeosSession, response, DACITE_CONFIG)

    def getUserStatus(self, user: str = None) -> NeosUser:
        if user is None:
            user = self.userId
        response = self._request('get', '/users/' + user + '/status/')
        return dacite.from_dict(NeosUserStatus, response, DACITE_CONFIG)

    def getFriends(self):
        """
        returns the friends you have.

        Note: does not create friends out of thin air. you need to do that yourself.
        """
        response = self._request('get', f"/users/{self.userId}/friends")
        return [dacite.from_dict(NeosFriend, user, DACITE_CONFIG) for user in response]

    def getInventory(self) -> List[NeosRecord]:
        """
        The typical entrypoint to the inventory system.
        """
        response = self._request(
            'get',
            f"/users/{self.userId}/records",
            params={"path": "Inventory"},
        )
        return self.processRecordList(response)

    def getDirectory(self, directory: NeosDirectory) -> List[NeosRecord]:
        """
        given a directory, return it's contents.
        """
        response = self._request(
            'get',
            f"/users/{directory.ownerId}/records",
            params={"path": directory.content_path},
        )
        return self.processRecordList(response)

    def resolveLink(self, link: NeosLink) -> NeosDirectory:
        """
        given a link type record, will return it's directory. directoy can be passed to getDirectory
        """
        _, user, record = link.assetUri.path.split("/")  # TODO: better
        response = self._request(
            'get',
            f"/users/{user}/records/{record}",
        )
        return dacite.from_dict(NeosDirectory, response, DACITE_CONFIG)

    def buildMessage(
        self, sender_id: str, recipiend_id: str, msg: str
    ) -> NeosMessage:
        message_type = NeosMessageType.TEXT
        builded_msg = {
            'id': f'MSG-{uuid4()}',
            'senderId': sender_id,
            'ownerId': sender_id,
            'sendTime': datetime.now().isoformat(),
            'recipientId': recipiend_id,
            'messageType': message_type,
            'content': msg,
        }
        return dacite.from_dict(NeosMessage, builded_msg, DACITE_CONFIG)

    def sendMessageLegacy(
        self, sender_id: str, recipiend_id: str, msg: str
    ) -> None:
        self._request(
            'post',
            f'/users/{recipiend_id}/messages',
            json=dataclasses.asdict(
                self.buildMessage(sender_id, recipiend_id, msg),
                dict_factory=nested_asdict_factory,
            )
        )

    def getMessageLegacy(
        self, fromTime: str = None, maxItems: int = 100,
        user: str = None, unreadOnly: bool = False
    ) -> None:
        params = {}
        if fromTime:
            raise ValueError('fromTime is not yet implemented')
        params['maxItems'] = maxItems
        params['unreadOnly'] = unreadOnly
        if user:
            params['user'] = user
        return self._request(
            'get',
            f'/users/{self.userId}/messages',
            params=params
        )

    def getOwnerPath(self, ownerId: str) -> str:
        ownerType = getOwnerType(ownerId)
        if ownerType == OwnerType.USER:
            return "users"
        elif ownerType == OwnerType.GROUP:
            return "groups"
        else:
            raise ValueError(f"invalid ownerType for {ownerId}")

    def listCloudVar(self, ownerId: str) -> List[NeosCloudVar]:
        response = self._request(
            'get',
            f'/{self.getOwnerPath(ownerId)}/{ownerId}/vars'
        )
        return [dacite.from_dict(NeosCloudVar, cloud_var, DACITE_CONFIG) for cloud_var in response]

    def getCloudVar(self, ownerId: str, path: str) -> NeosCloudVar:
        response = self._request(
            'get',
            f'/{self.getOwnerPath(ownerId)}/{ownerId}/vars/{path}'
        )
        return dacite.from_dict(NeosCloudVar, response, DACITE_CONFIG)

    def getCloudVarDefs(self, ownerId: str, path: str) -> NeosCloudVarDefs:
        response = self._request(
            'get',
            f'/{self.getOwnerPath(ownerId)}/{ownerId}/vardefs/{path}'
        )
        return dacite.from_dict(NeosCloudVarDefs, response, DACITE_CONFIG)

    def setCloudVar(self, ownerId: str, path: str, value: str) -> None:
        return self._request(
            'put',
            f'/{self.getOwnerPath(ownerId)}/{ownerId}/vars/{path}',
            json = {
                "ownerId": ownerId,
                "path": path,
                "value": value,
            }
        )

    def searchUser(self, username: str) -> dict:
        """ 
        return a list of user based on username.

        This is not the U- NeosVR user id, the API will search over usernames not ids.
        """
        return self._request(
            'get',
            '/users',
            params = {'name': username}
        )

Classes

class Client (userId: str = None, token: str = None, expire: datetime.datetime = None, rememberMe: bool = False, lastUpdate: datetime.datetime = None, secretMachineId: str = None, session: requests.sessions.Session = <requests.sessions.Session object>)

Representation of a neosvrpy NeosVR client.

Expand source code
@dataclasses.dataclass
class Client:
    """Representation of a neosvrpy NeosVR client."""
    userId: str = None
    token: str = None
    expire: datetime = None  # This don't seems to be use by the API.
    rememberMe: bool = False
    lastUpdate: datetime = None
    secretMachineId: str = None
    session: Session = Session()


    @property
    def headers(self) -> dict:
        default = {"User-Agent": f"neosvrpy/{__version__}"}
        if not self.userId or not self.token:
            logging.warning("WARNING: headers sections not set. this might throw an error soon...")
            return default
        default["Authorization"] = f"neos {self.userId}:{self.token}"
        return default

    @staticmethod
    def processRecordList(data: List[dict]):
        ret = []
        for raw_item in data:
            item = dacite.from_dict(NeosRecord, raw_item, DACITE_CONFIG)
            x = dacite.from_dict(recordTypeMapping[item.recordType], raw_item, DACITE_CONFIG)
            ret.append(x)
        return ret

    def _request(
            self, verb: str, path: str, data: dict = None, json: dict = None,
            params: dict = None, ignoreUpdate: bool = False
        ) -> Dict:
        if self.lastUpdate and not ignoreUpdate:
            lastUpdate = self.lastUpdate
            # From PolyLogix/CloudX.js, the token seems to expire after 3600000 seconds
            if (datetime.now() - lastUpdate).total_seconds() <= 3600000:
                self._request('patch', '/userSessions', ignoreUpdate=True)
                self.lastUpdate = datetime.now()
            # While the API dont seems to implement more security, official client behavior must be respected.
            # Only disconnect after 1 day of inactivity for now.
            # TODO: Implement disconnection after 1 week of inactivity when implementing the rememberMe feature.
            #if 64800 >= (datetime.now() - lastUpdate).total_seconds() >= 85536:
            #    self._request('patch', '/userSessions', ignoreUpdate=True)
            else:
                raise neos_exceptions.InvalidToken("Token expired")
        args = {'url': CLOUDX_NEOS_API + path}
        if data: args['data'] = data
        if json: args['json'] = json
        if params: args['params'] = params
        func = getattr(self.session, verb, None)
        with func(**args) as req:
            logging.debug("NeosAPI: [{}] {}".format(req.status_code, args))
            if req.status_code not in [200, 204]:
                if "Invalid credentials" in req.text:
                    raise neos_exceptions.InvalidCredentials(req.text)
                elif req.status_code == 403:
                    raise neos_exceptions.InvalidToken(req.headers)
                else:
                    raise neos_exceptions.NeosAPIException(req)
            if req.status_code == 200:
                try:
                    response = req.json()
                    if "message" in response:
                        raise neos_exceptions.NeosAPIException(req, message=response["message"])
                    return response
                except requests_exceptions.JSONDecodeError:
                    return req.text
            # In case of a 204 response
            return

    def login(self, data: LoginDetails) -> None:
        response = self._request('post', "/userSessions",
            json=dataclasses.asdict(data))
        self.userId = response["userId"]
        self.token = response["token"]
        self.secretMachineId = response["secretMachineId"]
        self.expire = isoparse(response["expire"])
        self.lastUpdate = datetime.now()
        self.session.headers.update(self.headers)

    def logout(self) -> None:
        self._request('delete',
            "/userSessions/{}/{}".format(self.userId, self.token),
            ignoreUpdate=True,
        )
        self.clean_session()

    def clean_session(self) -> None:
        self.userId = None
        self.token = None
        self.expire = None
        self.secretMachineId = None
        self.lastUpdate = None
        del self.session.headers["Authorization"]
        self.session.headers.update(self.headers)

    def loadToken(self):
        if OSpath.exists(AUTHFILE_NAME):
            with open(AUTHFILE_NAME, "r") as f:
                session = json.load(f)
                expire = datetime.fromisoformat(session["expire"])
                if datetime.now().timestamp() < expire.timestamp():
                    self.token = session["token"]
                    self.userId = session["userId"]
                    self.expire = expire
                    self.secretMachineId = session["secretMachineId"]
                    self.session.headers.update(self.headers)
                else:
                    raise neos_exceptions.NoTokenError
        else:
            raise neos_exceptions.NoTokenError

    def saveToken(self):
        with open(AUTHFILE_NAME, "w+") as f:
            json.dump(
                {
                    "userId": self.userId,
                    "expire": self.expire.isoformat(),
                    "token": self.token,
                    "secretMachineId": self.secretMachineId,
                },
                f,
            )

    def neosDBSignature(self, url: str) -> str:
        return url.split("//")[1].split(".")[0]

    def neosDbToHttp(self, iconUrl: str) -> str:
        url = "https://assets.neos.com/assets"
        url = url + self.neosDBSignature(iconUrl)
        return url

    def getUserData(self, user: str = None) -> NeosUser:
        if user is None:
            user = self.userId
        response = self._request('get', "/users/" + user)
        return dacite.from_dict(NeosUser, response, DACITE_CONFIG)

    def getSession(self, session_id: str) -> NeosSession:
        """ Return session information.
        """
        response = self._request('get', f'/sessions/{session_id}')
        return dacite.from_dict(NeosSession, response, DACITE_CONFIG)

    def getUserStatus(self, user: str = None) -> NeosUser:
        if user is None:
            user = self.userId
        response = self._request('get', '/users/' + user + '/status/')
        return dacite.from_dict(NeosUserStatus, response, DACITE_CONFIG)

    def getFriends(self):
        """
        returns the friends you have.

        Note: does not create friends out of thin air. you need to do that yourself.
        """
        response = self._request('get', f"/users/{self.userId}/friends")
        return [dacite.from_dict(NeosFriend, user, DACITE_CONFIG) for user in response]

    def getInventory(self) -> List[NeosRecord]:
        """
        The typical entrypoint to the inventory system.
        """
        response = self._request(
            'get',
            f"/users/{self.userId}/records",
            params={"path": "Inventory"},
        )
        return self.processRecordList(response)

    def getDirectory(self, directory: NeosDirectory) -> List[NeosRecord]:
        """
        given a directory, return it's contents.
        """
        response = self._request(
            'get',
            f"/users/{directory.ownerId}/records",
            params={"path": directory.content_path},
        )
        return self.processRecordList(response)

    def resolveLink(self, link: NeosLink) -> NeosDirectory:
        """
        given a link type record, will return it's directory. directoy can be passed to getDirectory
        """
        _, user, record = link.assetUri.path.split("/")  # TODO: better
        response = self._request(
            'get',
            f"/users/{user}/records/{record}",
        )
        return dacite.from_dict(NeosDirectory, response, DACITE_CONFIG)

    def buildMessage(
        self, sender_id: str, recipiend_id: str, msg: str
    ) -> NeosMessage:
        message_type = NeosMessageType.TEXT
        builded_msg = {
            'id': f'MSG-{uuid4()}',
            'senderId': sender_id,
            'ownerId': sender_id,
            'sendTime': datetime.now().isoformat(),
            'recipientId': recipiend_id,
            'messageType': message_type,
            'content': msg,
        }
        return dacite.from_dict(NeosMessage, builded_msg, DACITE_CONFIG)

    def sendMessageLegacy(
        self, sender_id: str, recipiend_id: str, msg: str
    ) -> None:
        self._request(
            'post',
            f'/users/{recipiend_id}/messages',
            json=dataclasses.asdict(
                self.buildMessage(sender_id, recipiend_id, msg),
                dict_factory=nested_asdict_factory,
            )
        )

    def getMessageLegacy(
        self, fromTime: str = None, maxItems: int = 100,
        user: str = None, unreadOnly: bool = False
    ) -> None:
        params = {}
        if fromTime:
            raise ValueError('fromTime is not yet implemented')
        params['maxItems'] = maxItems
        params['unreadOnly'] = unreadOnly
        if user:
            params['user'] = user
        return self._request(
            'get',
            f'/users/{self.userId}/messages',
            params=params
        )

    def getOwnerPath(self, ownerId: str) -> str:
        ownerType = getOwnerType(ownerId)
        if ownerType == OwnerType.USER:
            return "users"
        elif ownerType == OwnerType.GROUP:
            return "groups"
        else:
            raise ValueError(f"invalid ownerType for {ownerId}")

    def listCloudVar(self, ownerId: str) -> List[NeosCloudVar]:
        response = self._request(
            'get',
            f'/{self.getOwnerPath(ownerId)}/{ownerId}/vars'
        )
        return [dacite.from_dict(NeosCloudVar, cloud_var, DACITE_CONFIG) for cloud_var in response]

    def getCloudVar(self, ownerId: str, path: str) -> NeosCloudVar:
        response = self._request(
            'get',
            f'/{self.getOwnerPath(ownerId)}/{ownerId}/vars/{path}'
        )
        return dacite.from_dict(NeosCloudVar, response, DACITE_CONFIG)

    def getCloudVarDefs(self, ownerId: str, path: str) -> NeosCloudVarDefs:
        response = self._request(
            'get',
            f'/{self.getOwnerPath(ownerId)}/{ownerId}/vardefs/{path}'
        )
        return dacite.from_dict(NeosCloudVarDefs, response, DACITE_CONFIG)

    def setCloudVar(self, ownerId: str, path: str, value: str) -> None:
        return self._request(
            'put',
            f'/{self.getOwnerPath(ownerId)}/{ownerId}/vars/{path}',
            json = {
                "ownerId": ownerId,
                "path": path,
                "value": value,
            }
        )

    def searchUser(self, username: str) -> dict:
        """ 
        return a list of user based on username.

        This is not the U- NeosVR user id, the API will search over usernames not ids.
        """
        return self._request(
            'get',
            '/users',
            params = {'name': username}
        )

Class variables

var expire : datetime.datetime
var lastUpdate : datetime.datetime
var rememberMe : bool
var secretMachineId : str
var session : requests.sessions.Session
var token : str
var userId : str

Static methods

def processRecordList(data: List[dict])
Expand source code
@staticmethod
def processRecordList(data: List[dict]):
    ret = []
    for raw_item in data:
        item = dacite.from_dict(NeosRecord, raw_item, DACITE_CONFIG)
        x = dacite.from_dict(recordTypeMapping[item.recordType], raw_item, DACITE_CONFIG)
        ret.append(x)
    return ret

Instance variables

var headers : dict
Expand source code
@property
def headers(self) -> dict:
    default = {"User-Agent": f"neosvrpy/{__version__}"}
    if not self.userId or not self.token:
        logging.warning("WARNING: headers sections not set. this might throw an error soon...")
        return default
    default["Authorization"] = f"neos {self.userId}:{self.token}"
    return default

Methods

def buildMessage(self, sender_id: str, recipiend_id: str, msg: str) ‑> NeosMessage
Expand source code
def buildMessage(
    self, sender_id: str, recipiend_id: str, msg: str
) -> NeosMessage:
    message_type = NeosMessageType.TEXT
    builded_msg = {
        'id': f'MSG-{uuid4()}',
        'senderId': sender_id,
        'ownerId': sender_id,
        'sendTime': datetime.now().isoformat(),
        'recipientId': recipiend_id,
        'messageType': message_type,
        'content': msg,
    }
    return dacite.from_dict(NeosMessage, builded_msg, DACITE_CONFIG)
def clean_session(self) ‑> None
Expand source code
def clean_session(self) -> None:
    self.userId = None
    self.token = None
    self.expire = None
    self.secretMachineId = None
    self.lastUpdate = None
    del self.session.headers["Authorization"]
    self.session.headers.update(self.headers)
def getCloudVar(self, ownerId: str, path: str) ‑> NeosCloudVar
Expand source code
def getCloudVar(self, ownerId: str, path: str) -> NeosCloudVar:
    response = self._request(
        'get',
        f'/{self.getOwnerPath(ownerId)}/{ownerId}/vars/{path}'
    )
    return dacite.from_dict(NeosCloudVar, response, DACITE_CONFIG)
def getCloudVarDefs(self, ownerId: str, path: str) ‑> NeosCloudVarDefs
Expand source code
def getCloudVarDefs(self, ownerId: str, path: str) -> NeosCloudVarDefs:
    response = self._request(
        'get',
        f'/{self.getOwnerPath(ownerId)}/{ownerId}/vardefs/{path}'
    )
    return dacite.from_dict(NeosCloudVarDefs, response, DACITE_CONFIG)
def getDirectory(self, directory: NeosDirectory) ‑> List[NeosRecord]

given a directory, return it's contents.

Expand source code
def getDirectory(self, directory: NeosDirectory) -> List[NeosRecord]:
    """
    given a directory, return it's contents.
    """
    response = self._request(
        'get',
        f"/users/{directory.ownerId}/records",
        params={"path": directory.content_path},
    )
    return self.processRecordList(response)
def getFriends(self)

returns the friends you have.

Note: does not create friends out of thin air. you need to do that yourself.

Expand source code
def getFriends(self):
    """
    returns the friends you have.

    Note: does not create friends out of thin air. you need to do that yourself.
    """
    response = self._request('get', f"/users/{self.userId}/friends")
    return [dacite.from_dict(NeosFriend, user, DACITE_CONFIG) for user in response]
def getInventory(self) ‑> List[NeosRecord]

The typical entrypoint to the inventory system.

Expand source code
def getInventory(self) -> List[NeosRecord]:
    """
    The typical entrypoint to the inventory system.
    """
    response = self._request(
        'get',
        f"/users/{self.userId}/records",
        params={"path": "Inventory"},
    )
    return self.processRecordList(response)
def getMessageLegacy(self, fromTime: str = None, maxItems: int = 100, user: str = None, unreadOnly: bool = False) ‑> None
Expand source code
def getMessageLegacy(
    self, fromTime: str = None, maxItems: int = 100,
    user: str = None, unreadOnly: bool = False
) -> None:
    params = {}
    if fromTime:
        raise ValueError('fromTime is not yet implemented')
    params['maxItems'] = maxItems
    params['unreadOnly'] = unreadOnly
    if user:
        params['user'] = user
    return self._request(
        'get',
        f'/users/{self.userId}/messages',
        params=params
    )
def getOwnerPath(self, ownerId: str) ‑> str
Expand source code
def getOwnerPath(self, ownerId: str) -> str:
    ownerType = getOwnerType(ownerId)
    if ownerType == OwnerType.USER:
        return "users"
    elif ownerType == OwnerType.GROUP:
        return "groups"
    else:
        raise ValueError(f"invalid ownerType for {ownerId}")
def getSession(self, session_id: str) ‑> Session

Return session information.

Expand source code
def getSession(self, session_id: str) -> NeosSession:
    """ Return session information.
    """
    response = self._request('get', f'/sessions/{session_id}')
    return dacite.from_dict(NeosSession, response, DACITE_CONFIG)
def getUserData(self, user: str = None) ‑> NeosUser
Expand source code
def getUserData(self, user: str = None) -> NeosUser:
    if user is None:
        user = self.userId
    response = self._request('get', "/users/" + user)
    return dacite.from_dict(NeosUser, response, DACITE_CONFIG)
def getUserStatus(self, user: str = None) ‑> NeosUser
Expand source code
def getUserStatus(self, user: str = None) -> NeosUser:
    if user is None:
        user = self.userId
    response = self._request('get', '/users/' + user + '/status/')
    return dacite.from_dict(NeosUserStatus, response, DACITE_CONFIG)
def listCloudVar(self, ownerId: str) ‑> List[NeosCloudVar]
Expand source code
def listCloudVar(self, ownerId: str) -> List[NeosCloudVar]:
    response = self._request(
        'get',
        f'/{self.getOwnerPath(ownerId)}/{ownerId}/vars'
    )
    return [dacite.from_dict(NeosCloudVar, cloud_var, DACITE_CONFIG) for cloud_var in response]
def loadToken(self)
Expand source code
def loadToken(self):
    if OSpath.exists(AUTHFILE_NAME):
        with open(AUTHFILE_NAME, "r") as f:
            session = json.load(f)
            expire = datetime.fromisoformat(session["expire"])
            if datetime.now().timestamp() < expire.timestamp():
                self.token = session["token"]
                self.userId = session["userId"]
                self.expire = expire
                self.secretMachineId = session["secretMachineId"]
                self.session.headers.update(self.headers)
            else:
                raise neos_exceptions.NoTokenError
    else:
        raise neos_exceptions.NoTokenError
def login(self, data: LoginDetails) ‑> None
Expand source code
def login(self, data: LoginDetails) -> None:
    response = self._request('post', "/userSessions",
        json=dataclasses.asdict(data))
    self.userId = response["userId"]
    self.token = response["token"]
    self.secretMachineId = response["secretMachineId"]
    self.expire = isoparse(response["expire"])
    self.lastUpdate = datetime.now()
    self.session.headers.update(self.headers)
def logout(self) ‑> None
Expand source code
def logout(self) -> None:
    self._request('delete',
        "/userSessions/{}/{}".format(self.userId, self.token),
        ignoreUpdate=True,
    )
    self.clean_session()
def neosDBSignature(self, url: str) ‑> str
Expand source code
def neosDBSignature(self, url: str) -> str:
    return url.split("//")[1].split(".")[0]
def neosDbToHttp(self, iconUrl: str) ‑> str
Expand source code
def neosDbToHttp(self, iconUrl: str) -> str:
    url = "https://assets.neos.com/assets"
    url = url + self.neosDBSignature(iconUrl)
    return url

given a link type record, will return it's directory. directoy can be passed to getDirectory

Expand source code
def resolveLink(self, link: NeosLink) -> NeosDirectory:
    """
    given a link type record, will return it's directory. directoy can be passed to getDirectory
    """
    _, user, record = link.assetUri.path.split("/")  # TODO: better
    response = self._request(
        'get',
        f"/users/{user}/records/{record}",
    )
    return dacite.from_dict(NeosDirectory, response, DACITE_CONFIG)
def saveToken(self)
Expand source code
def saveToken(self):
    with open(AUTHFILE_NAME, "w+") as f:
        json.dump(
            {
                "userId": self.userId,
                "expire": self.expire.isoformat(),
                "token": self.token,
                "secretMachineId": self.secretMachineId,
            },
            f,
        )
def searchUser(self, username: str) ‑> dict

return a list of user based on username.

This is not the U- NeosVR user id, the API will search over usernames not ids.

Expand source code
def searchUser(self, username: str) -> dict:
    """ 
    return a list of user based on username.

    This is not the U- NeosVR user id, the API will search over usernames not ids.
    """
    return self._request(
        'get',
        '/users',
        params = {'name': username}
    )
def sendMessageLegacy(self, sender_id: str, recipiend_id: str, msg: str) ‑> None
Expand source code
def sendMessageLegacy(
    self, sender_id: str, recipiend_id: str, msg: str
) -> None:
    self._request(
        'post',
        f'/users/{recipiend_id}/messages',
        json=dataclasses.asdict(
            self.buildMessage(sender_id, recipiend_id, msg),
            dict_factory=nested_asdict_factory,
        )
    )
def setCloudVar(self, ownerId: str, path: str, value: str) ‑> None
Expand source code
def setCloudVar(self, ownerId: str, path: str, value: str) -> None:
    return self._request(
        'put',
        f'/{self.getOwnerPath(ownerId)}/{ownerId}/vars/{path}',
        json = {
            "ownerId": ownerId,
            "path": path,
            "value": value,
        }
    )