mirror of https://github.com/caronc/apprise
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1527 lines
48 KiB
1527 lines
48 KiB
# -*- coding: utf-8 -*- |
|
# BSD 2-Clause License |
|
# |
|
# Apprise - Push Notification Library. |
|
# Copyright (c) 2024, Chris Caron <lead2gold@gmail.com> |
|
# |
|
# Redistribution and use in source and binary forms, with or without |
|
# modification, are permitted provided that the following conditions are met: |
|
# |
|
# 1. Redistributions of source code must retain the above copyright notice, |
|
# this list of conditions and the following disclaimer. |
|
# |
|
# 2. Redistributions in binary form must reproduce the above copyright notice, |
|
# this list of conditions and the following disclaimer in the documentation |
|
# and/or other materials provided with the distribution. |
|
# |
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
# POSSIBILITY OF SUCH DAMAGE. |
|
|
|
import time |
|
import os |
|
import sys |
|
import zlib |
|
import pytest |
|
import shutil |
|
import json |
|
import gzip |
|
from unittest import mock |
|
from datetime import datetime, timedelta, timezone |
|
from apprise import exception |
|
from apprise.asset import AppriseAsset |
|
from apprise.persistent_store import ( |
|
CacheJSONEncoder, CacheObject, PersistentStore, PersistentStoreMode) |
|
|
|
# Disable logging for a cleaner testing output |
|
import logging |
|
logging.disable(logging.CRITICAL) |
|
|
|
# Attachment Directory |
|
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var') |
|
|
|
|
|
def test_persistent_storage_asset(tmpdir): |
|
""" |
|
Tests the Apprise Asset Object when setting the Persistent Store |
|
""" |
|
|
|
asset = AppriseAsset(storage_path=str(tmpdir)) |
|
assert asset.storage_path == str(tmpdir) |
|
assert asset.storage_mode is PersistentStoreMode.AUTO |
|
|
|
# If there is no storage path, we're always set to memory |
|
asset = AppriseAsset( |
|
storage_path=None, storage_mode=PersistentStoreMode.MEMORY) |
|
assert asset.storage_path is None |
|
assert asset.storage_mode is PersistentStoreMode.MEMORY |
|
|
|
|
|
def test_disabled_persistent_storage(tmpdir): |
|
""" |
|
Persistent Storage General Testing |
|
|
|
""" |
|
# Create ourselves an attachment object set in Memory Mode only |
|
pc = PersistentStore( |
|
namespace='abc', path=str(tmpdir), mode=PersistentStoreMode.MEMORY) |
|
assert pc.read() is None |
|
assert pc.read('mykey') is None |
|
with pytest.raises(AttributeError): |
|
# Invalid key specified |
|
pc.read('!invalid') |
|
assert pc.write('data') is False |
|
assert pc.get('key') is None |
|
assert pc.set('key', 'value') |
|
assert pc.get('key') == 'value' |
|
|
|
assert pc.set('key2', 'value') |
|
pc.clear('key', 'key-not-previously-set') |
|
assert pc.get('key2') == 'value' |
|
assert pc.get('key') is None |
|
|
|
# Set it again |
|
assert pc.set('key', 'another-value') |
|
# Clears all |
|
pc.clear() |
|
assert pc.get('key2') is None |
|
assert pc.get('key') is None |
|
# A second call to clear on an already empty cache set |
|
pc.clear() |
|
|
|
# No dirty flag is set as ther is nothing to write to disk |
|
pc.set('not-persistent', 'value', persistent=False) |
|
del pc['not-persistent'] |
|
with pytest.raises(KeyError): |
|
# Can't delete it twice |
|
del pc['not-persistent'] |
|
|
|
# A Persistent key |
|
pc.set('persistent', 'value') |
|
# Removes it and sets/clears the dirty flag |
|
del pc['persistent'] |
|
|
|
# After all of the above, nothing was done to the directory |
|
assert len(os.listdir(str(tmpdir))) == 0 |
|
|
|
with pytest.raises(AttributeError): |
|
# invalid persistent store specified |
|
PersistentStore( |
|
namespace='abc', path=str(tmpdir), mode='garbage') |
|
|
|
|
|
def test_persistent_storage_init(tmpdir): |
|
""" |
|
Test storage initialization |
|
""" |
|
with pytest.raises(AttributeError): |
|
PersistentStore(namespace="", path=str(tmpdir)) |
|
with pytest.raises(AttributeError): |
|
PersistentStore(namespace=None, path=str(tmpdir)) |
|
|
|
with pytest.raises(AttributeError): |
|
PersistentStore(namespace="_", path=str(tmpdir)) |
|
with pytest.raises(AttributeError): |
|
PersistentStore(namespace=".", path=str(tmpdir)) |
|
with pytest.raises(AttributeError): |
|
PersistentStore(namespace="-", path=str(tmpdir)) |
|
|
|
with pytest.raises(AttributeError): |
|
PersistentStore(namespace="_abc", path=str(tmpdir)) |
|
with pytest.raises(AttributeError): |
|
PersistentStore(namespace=".abc", path=str(tmpdir)) |
|
with pytest.raises(AttributeError): |
|
PersistentStore(namespace="-abc", path=str(tmpdir)) |
|
|
|
with pytest.raises(AttributeError): |
|
PersistentStore(namespace="%", path=str(tmpdir)) |
|
|
|
|
|
def test_persistent_storage_general(tmpdir): |
|
""" |
|
Persistent Storage General Testing |
|
|
|
""" |
|
namespace = 'abc' |
|
# Create ourselves an attachment object |
|
pc = PersistentStore() |
|
|
|
# Default mode when a path is not provided |
|
assert pc.mode == PersistentStoreMode.MEMORY |
|
|
|
assert pc.size() == 0 |
|
assert pc.files() == [] |
|
assert pc.files(exclude=True, lazy=False) == [] |
|
assert pc.files(exclude=False, lazy=False) == [] |
|
pc.set('key', 'value') |
|
# There is no disk size utilized |
|
assert pc.size() == 0 |
|
assert pc.files(exclude=True, lazy=False) == [] |
|
assert pc.files(exclude=False, lazy=False) == [] |
|
|
|
# Create ourselves an attachment object |
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir)) |
|
|
|
# Default mode when a path is provided |
|
assert pc.mode == PersistentStoreMode.AUTO |
|
|
|
# Get our path associated with our Persistent Store |
|
assert pc.path == os.path.join(str(tmpdir), 'abc') |
|
|
|
# Expiry testing |
|
assert pc.set('key', 'value', datetime.now() + timedelta(hours=1)) |
|
# i min in the future |
|
assert pc.set('key', 'value', 60) |
|
|
|
with pytest.raises(AttributeError): |
|
assert pc.set('key', 'value', 'invalid') |
|
|
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir)) |
|
|
|
# Our key is still valid and we load it from disk |
|
assert pc.get('key') == 'value' |
|
assert pc['key'] == 'value' |
|
|
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir)) |
|
assert pc.keys() |
|
# Second call after already initialized skips over initialization |
|
assert pc.keys() |
|
|
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir)) |
|
|
|
with pytest.raises(KeyError): |
|
# The below |
|
pc['unassigned_key'] |
|
|
|
|
|
def test_persistent_storage_auto_mode(tmpdir): |
|
""" |
|
Persistent Storage Auto Write Testing |
|
|
|
""" |
|
namespace = 'abc' |
|
# Create ourselves an attachment object |
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.AUTO) |
|
|
|
pc.write(b'test') |
|
with mock.patch('os.unlink', side_effect=FileNotFoundError()): |
|
assert pc.delete(all=True) is True |
|
|
|
# Create a temporary file we can delete |
|
with open(os.path.join(pc.path, pc.temp_dir, 'test.file'), 'wb') as fd: |
|
fd.write(b'data') |
|
|
|
# Delete just the temporary files |
|
assert pc.delete(temp=True) is True |
|
|
|
# Delete just the temporary files |
|
# Create a cache entry and delete it |
|
assert pc.set('key', 'value') is True |
|
pc.write(b'test') |
|
assert pc.delete(cache=True) is True |
|
# Verify our data entry wasn't removed |
|
assert pc.read() == b'test' |
|
# But our cache was |
|
assert pc.get('key') is None |
|
|
|
# A reverse of the above... create a cache an data variable and |
|
# Clear the data; make sure our cache is still there |
|
assert pc.set('key', 'value') is True |
|
pc.write(b'test', key='iokey') is True |
|
assert pc.delete('iokey') is True |
|
assert pc.get('key') == 'value' |
|
assert pc.read('iokey') is None |
|
|
|
|
|
def test_persistent_storage_flush_mode(tmpdir): |
|
""" |
|
Persistent Storage Forced Write Testing |
|
|
|
""" |
|
namespace = 'abc' |
|
# Create ourselves an attachment object |
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
# Reference path |
|
path = os.path.join(str(tmpdir), namespace) |
|
|
|
assert pc.size() == 0 |
|
assert list(pc.files()) == [] |
|
|
|
# Key is not set yet |
|
assert pc.get('key') is None |
|
assert len(pc.keys()) == 0 |
|
assert 'key' not in pc |
|
|
|
# Verify our data is set |
|
assert pc.set('key', 'value') |
|
assert len(pc.keys()) == 1 |
|
assert 'key' in list(pc.keys()) |
|
|
|
assert pc.size() > 0 |
|
assert len(pc.files()) == 1 |
|
|
|
# Second call uses Lazy cache |
|
# Just our cache file |
|
assert len(pc.files()) == 1 |
|
|
|
# Setting the same value again uses a lazy mode and |
|
# bypasses all of the write overhead |
|
assert pc.set('key', 'value') |
|
|
|
path_content = os.listdir(path) |
|
# var, cache.psdata, and tmp |
|
assert len(path_content) == 3 |
|
|
|
# Assignments (causes another disk write) |
|
pc['key'] = 'value2' |
|
|
|
# Setting the same value and explictly marking the field as not being |
|
# perisistent |
|
pc.set('key-xx', 'abc123', persistent=False) |
|
# Changing it's value doesn't alter the persistent flag |
|
pc['key-xx'] = 'def678' |
|
# Setting it twice |
|
pc['key-xx'] = 'def678' |
|
|
|
# Our retrievals |
|
assert pc['key-xx'] == 'def678' |
|
assert pc.get('key-xx') == 'def678' |
|
|
|
# But on the destruction of our object, it is not available again |
|
del pc |
|
# Create ourselves an attachment object |
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
assert pc.get('key-xx') is None |
|
with pytest.raises(KeyError): |
|
pc['key-xx'] |
|
|
|
# Now our key is set |
|
assert 'key' in pc |
|
assert pc.get('key') == 'value2' |
|
|
|
# A directory was created identified by the namespace |
|
assert len(os.listdir(str(tmpdir))) == 1 |
|
assert namespace in os.listdir(str(tmpdir)) |
|
|
|
path_content = os.listdir(path) |
|
assert len(path_content) == 4 |
|
|
|
# Another write doesn't change the file count |
|
pc['key'] = 'value3' |
|
path_content = os.listdir(path) |
|
assert len(path_content) == 4 |
|
|
|
# Our temporary directory used for all file handling in this namespace |
|
assert pc.temp_dir in path_content |
|
# Our cache file |
|
assert os.path.basename(pc.cache_file) in path_content |
|
|
|
path = os.path.join(pc.path, pc.temp_dir) |
|
path_content = os.listdir(path) |
|
|
|
# We always do our best to clean any temporary files up |
|
assert len(path_content) == 0 |
|
|
|
# Destroy our object |
|
del pc |
|
|
|
# Re-initialize it |
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
# Our key is persistent and available right away |
|
assert pc.get('key') == 'value3' |
|
assert 'key' in pc |
|
|
|
# Remove our item |
|
del pc['key'] |
|
assert pc.size() == 0 |
|
assert 'key' not in pc |
|
|
|
assert pc.write('data') is True |
|
assert pc.read() == b'data' |
|
assert pc.write(b'data') is True |
|
assert pc.read() == b'data' |
|
|
|
assert pc.read('default') == b'data' |
|
assert pc.write('data2', key='mykey') is True |
|
assert pc.read('mykey') == b'data2' |
|
|
|
# We can selectively delete our key |
|
assert pc.delete('mykey') |
|
assert pc.read('mykey') is None |
|
# Other keys are not touched |
|
assert pc.read('default') == b'data' |
|
assert pc.read() == b'data' |
|
# Full purge |
|
assert pc.delete() |
|
assert pc.read('mykey') is None |
|
assert pc.read() is None |
|
|
|
# Practice with files |
|
with open(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'), 'rb') as fd: |
|
assert pc.write(fd, key='mykey', compress=False) is True |
|
|
|
# Read our content back |
|
fd.seek(0) |
|
assert pc.read('mykey', compress=False) == fd.read() |
|
|
|
with open(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'), 'rb') as fd: |
|
assert pc.write(fd, key='mykey', compress=True) is True |
|
|
|
# Read our content back; content will be compressed |
|
fd.seek(0) |
|
assert pc.read('mykey', compress=True) == fd.read() |
|
|
|
class Foobar: |
|
def read(*args, **kwargs): |
|
return 42 |
|
|
|
foobar = Foobar() |
|
# read() returns a non string/bin |
|
with pytest.raises(exception.AppriseDiskIOError): |
|
pc.write(foobar, key='foobar', compress=True) |
|
assert pc.read('foobar') is None |
|
|
|
class Foobar: |
|
def read(*args, **kwargs): |
|
return 'good' |
|
|
|
foobar = Foobar() |
|
# read() returns a string so the below write works |
|
assert pc.write(foobar, key='foobar', compress=True) |
|
assert pc.read('foobar') == b'good' |
|
pc.delete() |
|
|
|
class Foobar: |
|
def read(*args, **kwargs): |
|
# Throw an exception |
|
raise TypeError() |
|
|
|
foobar = Foobar() |
|
# read() returns a non string/bin |
|
with pytest.raises(exception.AppriseDiskIOError): |
|
pc.write(foobar, key='foobar', compress=True) |
|
assert pc.read('foobar') is None |
|
|
|
# Set our max_file_size |
|
_prev_max_file_size = pc.max_file_size |
|
pc.max_file_size = 1 |
|
assert pc.delete() |
|
|
|
assert pc.write('data') is False |
|
assert pc.read() is None |
|
|
|
# Restore setting |
|
pc.max_file_size = _prev_max_file_size |
|
|
|
# Reset |
|
pc.delete() |
|
|
|
assert pc.write('data') |
|
# Corrupt our data |
|
data = pc.read(compress=False)[:20] + pc.read(compress=False)[:10] |
|
pc.write(data, compress=False) |
|
|
|
# Now we'll get an exception reading back the corrupted data |
|
assert pc.read() is None |
|
|
|
# Keep in mind though the data is still there; operator should write |
|
# and read the way they expect to and things will work out fine |
|
# This test just proves that Apprise Peresistent storage still |
|
# gracefully handles bad data |
|
assert pc.read(compress=False) == data |
|
|
|
# No key exists also returns None |
|
assert pc.read('no-key-exists') is None |
|
|
|
pc.write(b'test') |
|
pc['key'] = 'value' |
|
with mock.patch('os.unlink', side_effect=FileNotFoundError()): |
|
assert pc.delete(all=True) is True |
|
with mock.patch('os.unlink', side_effect=OSError()): |
|
assert pc.delete(all=True) is False |
|
|
|
# Create a temporary file we can delete |
|
tmp_file = os.path.join(pc.path, pc.temp_dir, 'test.file') |
|
with open(tmp_file, 'wb') as fd: |
|
fd.write(b'data') |
|
|
|
assert pc.set('key', 'value') is True |
|
pc.write(b'test', key='iokey') is True |
|
# Delete just the temporary files |
|
assert pc.delete(temp=True) is True |
|
assert os.path.exists(tmp_file) is False |
|
# our other entries are untouched |
|
assert pc.get('key') == 'value' |
|
assert pc.read('iokey') == b'test' |
|
|
|
# Delete just the temporary files |
|
# Create a cache entry and delete it |
|
assert pc.set('key', 'value') is True |
|
pc.write(b'test') |
|
assert pc.delete(cache=True) is True |
|
# Verify our data entry wasn't removed |
|
assert pc.read() == b'test' |
|
# But our cache was |
|
assert pc.get('key') is None |
|
|
|
# A reverse of the above... create a cache an data variable and |
|
# Clear the data; make sure our cache is still there |
|
assert pc.set('key', 'value') is True |
|
pc.write(b'test', key='iokey') is True |
|
assert pc.delete('iokey') is True |
|
assert pc.get('key') == 'value' |
|
assert pc.read('iokey') is None |
|
|
|
# Create some custom files |
|
cust1_file = os.path.join(pc.path, 'test.file') |
|
cust2_file = os.path.join(pc.path, pc.data_dir, 'test.file') |
|
with open(cust1_file, 'wb') as fd: |
|
fd.write(b'data') |
|
with open(cust2_file, 'wb') as fd: |
|
fd.write(b'data') |
|
|
|
# Even after a full flush our files will exist |
|
assert pc.delete() |
|
assert os.path.exists(cust1_file) is True |
|
assert os.path.exists(cust2_file) is True |
|
|
|
# However, if we turn off validate, we do a full sweep because these |
|
# unknown files are lingering in our directory space |
|
assert pc.delete(validate=False) |
|
assert os.path.exists(cust1_file) is False |
|
assert os.path.exists(cust2_file) is False |
|
|
|
pc['key'] = 'value' |
|
pc['key2'] = 'value2' |
|
assert 'key' in pc |
|
assert 'key2' in pc |
|
pc.clear('key') |
|
assert 'key' not in pc |
|
assert 'key2' in pc |
|
|
|
# Set expired content |
|
pc.set( |
|
'expired', 'expired-content', |
|
expires=datetime.now() - timedelta(days=1)) |
|
|
|
# It's actually there... but it's expired so our persistent |
|
# storage is behaving as it should |
|
assert 'expired' not in pc |
|
assert pc.get('expired') is None |
|
# Prune our content |
|
pc.prune() |
|
|
|
|
|
def test_persistent_storage_corruption_handling(tmpdir): |
|
""" |
|
Test corrupting handling of storage |
|
""" |
|
|
|
# Namespace |
|
namespace = 'def456' |
|
|
|
# Initialize it |
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
cache_file = pc.cache_file |
|
assert not os.path.isfile(cache_file) |
|
|
|
# Store our key |
|
pc['mykey'] = 42 |
|
assert os.path.isfile(cache_file) |
|
|
|
with gzip.open(cache_file, 'rb') as f: |
|
# Read our content from disk |
|
json.loads(f.read().decode('utf-8')) |
|
|
|
# Remove object |
|
del pc |
|
|
|
# Corrupt the file |
|
with open(cache_file, 'wb') as f: |
|
f.write(b'{') |
|
|
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
# File is corrupted |
|
assert 'mykey' not in pc |
|
pc['mykey'] = 42 |
|
del pc |
|
|
|
# File is corrected now |
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
assert 'mykey' in pc |
|
|
|
# Corrupt the file again |
|
with gzip.open(cache_file, 'wb') as f: |
|
# Bad JSON File |
|
f.write(b'{') |
|
|
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
# File is corrupted |
|
assert 'mykey' not in pc |
|
pc['mykey'] = 42 |
|
del pc |
|
|
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
# Test our force flush |
|
assert pc.flush(force=True) is True |
|
# double call |
|
assert pc.flush(force=True) is True |
|
|
|
# Zlib error handling as well during open |
|
with mock.patch('gzip.open', side_effect=OSError()): |
|
with pytest.raises(KeyError): |
|
pc['mykey'] = 43 |
|
|
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
# Zlib error handling as well during open |
|
with mock.patch('gzip.open', side_effect=OSError()): |
|
# No keys can be returned |
|
assert not pc.keys() |
|
|
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
with mock.patch('json.loads', side_effect=TypeError()): |
|
with mock.patch('os.unlink', side_effect=FileNotFoundError()): |
|
with pytest.raises(KeyError): |
|
pc['mykey'] = 44 |
|
|
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
with mock.patch('json.loads', side_effect=TypeError()): |
|
with mock.patch('os.unlink', side_effect=OSError()): |
|
with pytest.raises(KeyError): |
|
pc['mykey'] = 45 |
|
|
|
pc['my-new-key'] = 43 |
|
with mock.patch('gzip.open', side_effect=OSError()): |
|
# We will fail to flush our content to disk |
|
assert pc.flush(force=True) is False |
|
|
|
with mock.patch('json.dumps', side_effect=TypeError()): |
|
# We will fail to flush our content to disk |
|
assert pc.flush(force=True) is False |
|
|
|
with mock.patch('os.makedirs', side_effect=OSError()): |
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
# Directory initialization failed so we fall back to memory mode |
|
assert pc.mode == PersistentStoreMode.MEMORY |
|
|
|
# Handle file updates |
|
pc = PersistentStore( |
|
namespace='file-time-refresh', path=str(tmpdir), |
|
mode=PersistentStoreMode.AUTO) |
|
|
|
pc['test'] = 'abcd' |
|
assert pc.write(b'data', key='abcd') is True |
|
assert pc.read('abcd', expires=True) == b'data' |
|
assert pc.write(b'data2', key='defg') is True |
|
assert pc.read('defg', expires=False) == b'data2' |
|
assert pc.write(b'data3', key='hijk') is True |
|
assert pc.read('hijk', expires=False) == b'data3' |
|
assert pc['test'] == 'abcd' |
|
|
|
with mock.patch('os.utime', side_effect=(OSError(), FileNotFoundError())): |
|
pc.flush() |
|
|
|
# directory initialization okay |
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
assert 'mykey' not in pc |
|
pc['mykey'] = 42 |
|
del pc |
|
|
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
assert 'mykey' in pc |
|
|
|
# Remove the last entry |
|
del pc['mykey'] |
|
with mock.patch('os.rename', side_effect=OSError()): |
|
with mock.patch('os.unlink', side_effect=OSError()): |
|
assert not pc.flush(force=True) |
|
|
|
# Create another entry |
|
pc['mykey'] = 42 |
|
with mock.patch('tempfile.NamedTemporaryFile', side_effect=OSError()): |
|
assert not pc.flush(force=True) |
|
|
|
# Temporary file cleanup failure |
|
with mock.patch('tempfile._TemporaryFileWrapper.close', |
|
side_effect=OSError()): |
|
assert not pc.flush(force=True) |
|
|
|
# Create another entry |
|
pc['mykey'] = 43 |
|
mock_ntf = mock.MagicMock() |
|
mock_ntf.name = os.path.join(tmpdir, 'file') |
|
|
|
# |
|
# Recursion loop checking |
|
# |
|
with mock.patch( |
|
'tempfile.NamedTemporaryFile', |
|
side_effect=[FileNotFoundError(), FileNotFoundError(), mock_ntf]): |
|
# No way to have recursion loop |
|
assert not pc.flush(force=True, _recovery=True) |
|
|
|
with mock.patch( |
|
'tempfile.NamedTemporaryFile', |
|
side_effect=[FileNotFoundError(), FileNotFoundError(), mock_ntf]): |
|
# No way to have recursion loop |
|
assert not pc.flush(force=False, _recovery=True) |
|
|
|
with mock.patch( |
|
'tempfile.NamedTemporaryFile', |
|
side_effect=[FileNotFoundError(), FileNotFoundError(), mock_ntf]): |
|
# No way to have recursion loop |
|
assert not pc.flush(force=False, _recovery=False) |
|
|
|
with mock.patch( |
|
'tempfile.NamedTemporaryFile', |
|
side_effect=[FileNotFoundError(), FileNotFoundError(), mock_ntf]): |
|
# No way to have recursion loop |
|
assert not pc.flush(force=True, _recovery=False) |
|
|
|
with mock.patch('tempfile._TemporaryFileWrapper.close', |
|
side_effect=(OSError(), None)): |
|
with mock.patch('os.unlink', side_effect=(OSError())): |
|
assert not pc.flush(force=True) |
|
|
|
with mock.patch( |
|
'tempfile._TemporaryFileWrapper.close', side_effect=OSError()): |
|
assert not pc.flush(force=True) |
|
|
|
with mock.patch( |
|
'tempfile._TemporaryFileWrapper.close', |
|
side_effect=(OSError(), None)): |
|
with mock.patch('os.unlink', side_effect=OSError()): |
|
assert not pc.flush(force=True) |
|
|
|
with mock.patch( |
|
'tempfile._TemporaryFileWrapper.close', |
|
side_effect=(OSError(), None)): |
|
with mock.patch('os.unlink', side_effect=FileNotFoundError()): |
|
assert not pc.flush(force=True) |
|
|
|
del pc |
|
|
|
# directory initialization okay |
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
# Allows us to play with encoding errors |
|
pc.encoding = 'ascii' |
|
|
|
# Handle write() calls |
|
with mock.patch('os.stat', side_effect=OSError()): |
|
# We fail to fetch the filesize of our old file causing us to fail |
|
assert pc.write('abcd') is False |
|
|
|
# ボールト translates to vault (no bad word here) :) |
|
data = "ボールト" |
|
|
|
# We'll have encoding issues |
|
assert pc.write(data) is False |
|
|
|
with mock.patch('gzip.open', side_effect=FileNotFoundError()): |
|
pc = PersistentStore(namespace=namespace, path=str(tmpdir)) |
|
|
|
# recovery mode will kick in and even it will fail |
|
assert pc.write(b'key') is False |
|
|
|
with mock.patch('gzip.open', side_effect=OSError()): |
|
pc = PersistentStore(namespace=namespace, path=str(tmpdir)) |
|
|
|
# Falls to default |
|
assert pc.get('key') is None |
|
|
|
pc = PersistentStore(namespace=namespace, path=str(tmpdir)) |
|
with pytest.raises(KeyError): |
|
pc['key'] = 'value' |
|
|
|
pc = PersistentStore(namespace=namespace, path=str(tmpdir)) |
|
with pytest.raises(KeyError): |
|
pc['key'] |
|
|
|
pc = PersistentStore(namespace=namespace, path=str(tmpdir)) |
|
with pytest.raises(KeyError): |
|
del pc['key'] |
|
|
|
pc = PersistentStore(namespace=namespace, path=str(tmpdir)) |
|
# Fails to set key |
|
assert pc.set('key', 'value') is False |
|
|
|
pc = PersistentStore(namespace=namespace, path=str(tmpdir)) |
|
# Fails to clear |
|
assert pc.clear() is False |
|
|
|
pc = PersistentStore(namespace=namespace, path=str(tmpdir)) |
|
# Fails to prune |
|
assert pc.prune() is False |
|
|
|
# Set some expired content |
|
pc.set( |
|
'key', 'value', persistent=False, |
|
expires=datetime.now() - timedelta(days=1)) |
|
pc.set( |
|
'key2', 'value2', persistent=True, |
|
expires=datetime.now() - timedelta(days=1)) |
|
|
|
# Set some un-expired content |
|
pc.set('key3', 'value3', persistent=True) |
|
pc.set('key4', 'value4', persistent=False) |
|
assert pc.prune() is True |
|
|
|
# Second call has no change made |
|
assert pc.prune() is False |
|
|
|
# Reset |
|
pc.delete() |
|
|
|
# directory initialization okay |
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
# Write some content that expires almost immediately |
|
pc.set( |
|
'key1', 'value', persistent=True, |
|
expires=datetime.now() + timedelta(seconds=1)) |
|
pc.set( |
|
'key2', 'value', persistent=True, |
|
expires=datetime.now() + timedelta(seconds=1)) |
|
pc.set( |
|
'key3', 'value', persistent=True, |
|
expires=datetime.now() + timedelta(seconds=1)) |
|
pc.flush() |
|
|
|
# Wait out our expiry |
|
time.sleep(1.3) |
|
|
|
# now initialize our storage again |
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
# This triggers our __load_cache() which reads in a value |
|
# determined to have already been expired |
|
assert 'key1' not in pc |
|
assert 'key2' not in pc |
|
assert 'key3' not in pc |
|
|
|
# Sweep |
|
pc.delete() |
|
pc.set('key', 'value') |
|
pc.set('key2', 'value2') |
|
pc.write('more-content') |
|
# Flush our content to disk |
|
pc.flush() |
|
|
|
# Ideally we'd use os.stat below, but it is called inside a list |
|
# comprehension block and mock doesn't appear to throw the exception |
|
# there. So this is a bit of a cheat, but it works |
|
with mock.patch('builtins.sum', side_effect=OSError()): |
|
assert pc.size(exclude=True, lazy=False) == 0 |
|
assert pc.size(exclude=False, lazy=False) == 0 |
|
|
|
pc = PersistentStore(namespace=namespace, path=str(tmpdir)) |
|
with mock.patch('glob.glob', side_effect=OSError()): |
|
assert pc.files(exclude=True, lazy=False) == [] |
|
assert pc.files(exclude=False, lazy=False) == [] |
|
|
|
pc = PersistentStore( |
|
namespace=namespace, path=str(tmpdir), |
|
mode=PersistentStoreMode.FLUSH) |
|
|
|
# Causes an initialization |
|
pc['abc'] = 1 |
|
with mock.patch('os.unlink', side_effect=OSError()): |
|
# Now we can't set data |
|
with pytest.raises(KeyError): |
|
pc['new-key'] = 'value' |
|
# However keys that alrady exist don't get caught in check |
|
# and therefore won't throw |
|
pc['abc'] = 'value' |
|
|
|
# |
|
# Handles flush() when the queue is empty |
|
# |
|
pc.clear() |
|
with mock.patch('os.unlink', side_effect=OSError()): |
|
# We can't remove backup cache file |
|
assert pc.flush(force=True) is False |
|
|
|
with mock.patch('os.unlink', side_effect=FileNotFoundError()): |
|
# FileNotFound is not an issue |
|
assert pc.flush(force=True) is True |
|
|
|
with mock.patch('os.rename', side_effect=OSError()): |
|
# We can't create a backup |
|
assert pc.flush(force=True) is False |
|
|
|
with mock.patch('os.rename', side_effect=FileNotFoundError()): |
|
# FileNotFound is not an issue |
|
assert pc.flush(force=True) is True |
|
|
|
# Flush any previous cache and data |
|
pc.delete() |
|
|
|
# |
|
# Handles flush() cases where is data to write |
|
# |
|
|
|
# Create a key |
|
pc.set('abc', 'a-test-value') |
|
with mock.patch( |
|
'os.unlink', side_effect=(OSError(), None)): |
|
# We failed to move our content in place |
|
assert pc.flush(force=True) is False |
|
|
|
with mock.patch( |
|
'os.unlink', side_effect=(OSError(), FileNotFoundError())): |
|
# We failed to move our content in place |
|
assert pc.flush(force=True) is False |
|
|
|
with mock.patch( |
|
'os.unlink', side_effect=(OSError(), OSError())): |
|
# We failed to move our content in place |
|
assert pc.flush(force=True) is False |
|
|
|
|
|
def test_persistent_custom_io(tmpdir): |
|
""" |
|
Test reading and writing custom files |
|
""" |
|
|
|
# Initialize it for memory only |
|
pc = PersistentStore(path=str(tmpdir)) |
|
|
|
with pytest.raises(AttributeError): |
|
pc.open('!invalid#-Key') |
|
|
|
# We can't open the file as it does not exist |
|
with pytest.raises(FileNotFoundError): |
|
pc.open('valid-key') |
|
|
|
with pytest.raises(AttributeError): |
|
# Bad data |
|
pc.open(1234) |
|
|
|
with pytest.raises(FileNotFoundError): |
|
with pc.open('key') as fd: |
|
pass |
|
|
|
# Also can be caught using Apprise Exception Handling |
|
with pytest.raises(exception.AppriseFileNotFound): |
|
with pc.open('key') as fd: |
|
pass |
|
|
|
# Write some valid data |
|
with pc.open('new-key', 'wb') as fd: |
|
fd.write(b'data') |
|
|
|
with mock.patch("builtins.open", new_callable=mock.mock_open, |
|
read_data="mocked file content") as mock_file: |
|
mock_file.side_effect = OSError |
|
with pytest.raises(exception.AppriseDiskIOError): |
|
with pc.open('new-key', compress=False) as fd: |
|
pass |
|
|
|
# Again but with compression this time |
|
with mock.patch("gzip.open", new_callable=mock.mock_open, |
|
read_data="mocked file content") as mock_file: |
|
mock_file.side_effect = OSError |
|
with pytest.raises(exception.AppriseDiskIOError): |
|
with pc.open('new-key', compress=True) as fd: |
|
pass |
|
|
|
# Zlib error handling as well during open |
|
with mock.patch("gzip.open", new_callable=mock.mock_open, |
|
read_data="mocked file content") as mock_file: |
|
mock_file.side_effect = zlib.error |
|
with pytest.raises(exception.AppriseDiskIOError): |
|
with pc.open('new-key', compress=True) as fd: |
|
pass |
|
|
|
# Writing |
|
with pytest.raises(AttributeError): |
|
pc.write(1234) |
|
|
|
with pytest.raises(AttributeError): |
|
pc.write(None) |
|
|
|
with pytest.raises(AttributeError): |
|
pc.write(True) |
|
|
|
pc = PersistentStore(str(tmpdir)) |
|
with pc.open('key', 'wb') as fd: |
|
fd.write(b'test') |
|
fd.close() |
|
|
|
# Handle error capuring when failing to write to disk |
|
with mock.patch("gzip.open", new_callable=mock.mock_open, |
|
read_data="mocked file content") as mock_file: |
|
mock_file.side_effect = zlib.error |
|
|
|
# We fail to write to disk |
|
assert pc.write(b'test') is False |
|
|
|
# We support other errors too |
|
mock_file.side_effect = OSError |
|
assert pc.write(b'test') is False |
|
|
|
with pytest.raises(AttributeError): |
|
pc.write(b'data', key='!invalid#-Key') |
|
|
|
pc.delete() |
|
with mock.patch('os.unlink', side_effect=OSError()): |
|
# Write our data and the __move() will fail under the hood |
|
assert pc.write(b'test') is False |
|
|
|
pc.delete() |
|
with mock.patch('os.rename', side_effect=OSError()): |
|
# Write our data and the __move() will fail under the hood |
|
assert pc.write(b'test') is False |
|
|
|
pc.delete() |
|
with mock.patch('os.unlink', side_effect=(OSError(), FileNotFoundError())): |
|
# Write our data and the __move() will fail under the hood |
|
assert pc.write(b'test') is False |
|
|
|
pc.delete() |
|
with mock.patch('os.unlink', side_effect=(OSError(), None)): |
|
# Write our data and the __move() will fail under the hood |
|
assert pc.write(b'test') is False |
|
|
|
pc.delete() |
|
with mock.patch('os.unlink', side_effect=(OSError(), OSError())): |
|
# Write our data and the __move() will fail under the hood |
|
assert pc.write(b'test') is False |
|
|
|
pc.delete() |
|
with mock.patch('os.rename', side_effect=(None, OSError(), None)): |
|
assert pc.write(b'test') is False |
|
|
|
with mock.patch('os.rename', side_effect=(None, OSError(), OSError())): |
|
assert pc.write(b'test') is False |
|
|
|
with mock.patch('os.rename', side_effect=( |
|
None, OSError(), FileNotFoundError())): |
|
assert pc.write(b'test') is False |
|
|
|
pc.delete() |
|
with mock.patch('os.rename', side_effect=(None, None, None, OSError())): |
|
# not enough reason to fail |
|
assert pc.write(b'test') is True |
|
|
|
with mock.patch('os.stat', side_effect=OSError()): |
|
with mock.patch('os.close', side_effect=(None, OSError())): |
|
assert pc.write(b'test') is False |
|
|
|
pc.delete() |
|
with mock.patch( |
|
'tempfile._TemporaryFileWrapper.close', side_effect=OSError()): |
|
assert pc.write(b'test') is False |
|
|
|
pc.delete() |
|
with mock.patch( |
|
'tempfile._TemporaryFileWrapper.close', |
|
side_effect=(OSError(), None)): |
|
with mock.patch('os.unlink', side_effect=OSError()): |
|
assert pc.write(b'test') is False |
|
|
|
pc.delete() |
|
with mock.patch( |
|
'tempfile._TemporaryFileWrapper.close', |
|
side_effect=(OSError(), None)): |
|
with mock.patch('os.unlink', side_effect=FileNotFoundError()): |
|
assert pc.write(b'test') is False |
|
|
|
|
|
def test_persistent_storage_cache_object(tmpdir): |
|
""" |
|
General testing of a CacheObject |
|
""" |
|
# A cache object |
|
c = CacheObject(123) |
|
|
|
ref = datetime.now(tz=timezone.utc) |
|
expires = ref + timedelta(days=1) |
|
# Create a cache object that expires tomorrow |
|
c = CacheObject('abcd', expires=expires) |
|
assert c.expires == expires |
|
assert c.expires_sec > 86390.0 and c.expires_sec <= 86400.0 |
|
assert bool(c) is True |
|
assert 'never' not in str(c) |
|
assert 'str:+:abcd' in str(c) |
|
|
|
# |
|
# Testing CacheObject.set() |
|
# |
|
c.set(123) |
|
assert 'never' not in str(c) |
|
assert 'int:+:123' in str(c) |
|
hash_value = c.hash() |
|
assert isinstance(hash_value, str) |
|
|
|
c.set(124) |
|
assert 'never' not in str(c) |
|
assert 'int:+:124' in str(c) |
|
assert c.hash() != hash_value |
|
|
|
c.set(123) |
|
# sha is the same again if we set the value back |
|
assert c.hash() == hash_value |
|
|
|
c.set(124) |
|
assert isinstance(c.hash(), str) |
|
assert c.value == 124 |
|
assert bool(c) is True |
|
c.set(124, expires=False, persistent=False) |
|
assert bool(c) is True |
|
assert c.expires is None |
|
assert c.expires_sec is None |
|
c.set(124, expires=True) |
|
# we're expired now |
|
assert bool(c) is False |
|
|
|
# |
|
# Testing CacheObject equality (==) |
|
# |
|
a = CacheObject('abc') |
|
b = CacheObject('abc') |
|
|
|
assert a == b |
|
assert a == 'abc' |
|
assert b == 'abc' |
|
|
|
# Equality is no longer a thing |
|
b = CacheObject('abc', 30) |
|
assert a != b |
|
# however we can look at the value inside |
|
assert a == b.value |
|
|
|
b = CacheObject('abc', persistent=False) |
|
a = CacheObject('abc', persistent=True) |
|
# Persistent flag matters |
|
assert a != b |
|
# however we can look at the value inside |
|
assert a == b.value |
|
b = CacheObject('abc', persistent=True) |
|
assert a == b |
|
|
|
# Epoch |
|
EPOCH = datetime(1970, 1, 1) |
|
|
|
# test all of our supported types (also test time naive and aware times) |
|
for entry in ('string', 123, 1.2222, datetime.now(), |
|
datetime.now(tz=timezone.utc), None, False, True, b'\0'): |
|
# Create a cache object that expires tomorrow |
|
c = CacheObject(entry, datetime.now() + timedelta(days=1)) |
|
|
|
# Verify our content hasn't expired |
|
assert c |
|
|
|
# Verify we can dump our object |
|
result = json.loads(json.dumps( |
|
c, separators=(',', ':'), cls=CacheJSONEncoder)) |
|
|
|
# Instantiate our object |
|
cc = CacheObject.instantiate(result) |
|
assert cc.json() == c.json() |
|
|
|
# Test our JSON Encoder against items we don't support |
|
with pytest.raises(TypeError): |
|
json.loads(json.dumps( |
|
object(), separators=(',', ':'), cls=CacheJSONEncoder)) |
|
|
|
assert CacheObject.instantiate(None) is None |
|
assert CacheObject.instantiate({}) is None |
|
|
|
# Bad data |
|
assert CacheObject.instantiate({ |
|
'v': 123, |
|
'x': datetime.now(), |
|
'c': 'int'}) is None |
|
|
|
# object type is not supported |
|
assert CacheObject.instantiate({ |
|
'v': 123, |
|
'x': (datetime.now() - EPOCH).total_seconds(), |
|
'c': object}) is None |
|
|
|
obj = CacheObject.instantiate({ |
|
'v': 123, |
|
'x': (datetime.now() - EPOCH).total_seconds(), |
|
'c': 'int'}, verify=False) |
|
assert isinstance(obj, CacheObject) |
|
assert obj.value == 123 |
|
|
|
# no HASH and verify is set to true; our checksum will fail |
|
assert CacheObject.instantiate({ |
|
'v': 123, |
|
'x': (datetime.now() - EPOCH).total_seconds(), |
|
'c': 'int'}, verify=True) is None |
|
|
|
# We can't instantiate our object if the expiry value is bad |
|
assert CacheObject.instantiate({ |
|
'v': 123, |
|
'x': 'garbage', |
|
'c': 'int'}, verify=False) is None |
|
|
|
# We need a valid hash sum too |
|
assert CacheObject.instantiate({ |
|
'v': 123, |
|
'x': (datetime.now() - EPOCH).total_seconds(), |
|
'c': 'int', |
|
# Expecting a valid sha string |
|
'!': 1.0}, verify=False) is None |
|
|
|
# Our Bytes Object with corruption |
|
assert CacheObject.instantiate({ |
|
'v': 'garbage', |
|
'x': (datetime.now() - EPOCH).total_seconds(), |
|
'c': 'bytes'}, verify=False) is None |
|
|
|
obj = CacheObject.instantiate({ |
|
'v': 'AA==', |
|
'x': (datetime.now() - EPOCH).total_seconds(), |
|
'c': 'bytes'}, verify=False) |
|
assert isinstance(obj, CacheObject) |
|
assert obj.value == b'\0' |
|
|
|
# Test our datetime objects |
|
obj = CacheObject.instantiate({ |
|
'v': '2024-06-08T01:50:01.587267', |
|
'x': (datetime.now() - EPOCH).total_seconds(), |
|
'c': 'datetime'}, verify=False) |
|
assert isinstance(obj, CacheObject) |
|
assert obj.value == datetime(2024, 6, 8, 1, 50, 1, 587267) |
|
|
|
# A corrupt datetime object |
|
assert CacheObject.instantiate({ |
|
'v': 'garbage', |
|
'x': (datetime.now() - EPOCH).total_seconds(), |
|
'c': 'datetime'}, verify=False) is None |
|
|
|
|
|
@pytest.mark.skipif( |
|
sys.platform == "win32", reason="Unreliable results to be determined") |
|
def test_persistent_storage_disk_prune(tmpdir): |
|
""" |
|
General testing of a Persistent Store prune calls |
|
""" |
|
|
|
# Persistent Storage Initialization |
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t01', mode=PersistentStoreMode.FLUSH) |
|
# Store some data |
|
assert pc.write(b'data-t01') is True |
|
assert pc.set('key-t01', 'value') |
|
|
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t02', mode=PersistentStoreMode.FLUSH) |
|
# Store some data |
|
assert pc.write(b'data-t02') is True |
|
assert pc.set('key-t02', 'value') |
|
|
|
# purne anything older then 30s |
|
results = PersistentStore.disk_prune(path=str(tmpdir), expires=30) |
|
# Nothing is older then 30s right now |
|
assert isinstance(results, dict) |
|
assert 't01' in results |
|
assert 't02' in results |
|
assert len(results['t01']) == 0 |
|
assert len(results['t02']) == 0 |
|
|
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t01', mode=PersistentStoreMode.FLUSH) |
|
|
|
# Nothing is pruned |
|
assert pc.get('key-t01') == 'value' |
|
assert pc.read() == b'data-t01' |
|
|
|
# An expiry of zero gets everything |
|
# Note: This test randomly fails in Microsoft Windows for unknown reasons |
|
# When this is determined, this test can be opened back up |
|
results = PersistentStore.disk_prune(path=str(tmpdir), expires=0) |
|
# We match everything now |
|
assert isinstance(results, dict) |
|
assert 't01' in results |
|
assert 't02' in results |
|
assert len(results['t01']) == 2 |
|
assert len(results['t02']) == 2 |
|
|
|
# Content is still not removed however because no action was put in place |
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t02', mode=PersistentStoreMode.FLUSH) |
|
# Nothing is pruned |
|
assert pc.get('key-t02') == 'value' |
|
assert pc.read() == b'data-t02' |
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t01', mode=PersistentStoreMode.FLUSH) |
|
# Nothing is pruned |
|
assert pc.get('key-t01') == 'value' |
|
assert pc.read() == b'data-t01' |
|
|
|
with mock.patch('os.listdir', side_effect=OSError()): |
|
results = PersistentStore.disk_scan( |
|
namespace='t01', path=str(tmpdir), closest=True) |
|
assert isinstance(results, list) |
|
assert len(results) == 0 |
|
|
|
with mock.patch('os.listdir', side_effect=FileNotFoundError()): |
|
results = PersistentStore.disk_scan( |
|
namespace='t01', path=str(tmpdir), closest=True) |
|
assert isinstance(results, list) |
|
assert len(results) == 0 |
|
|
|
# Without closest flag |
|
results = PersistentStore.disk_scan( |
|
namespace='t01', path=str(tmpdir), closest=False) |
|
assert isinstance(results, list) |
|
assert len(results) == 0 |
|
|
|
# Now we'll filter on specific namespaces |
|
results = PersistentStore.disk_prune( |
|
namespace='notfound', path=str(tmpdir), expires=0, action=True) |
|
|
|
# nothing matched, nothing found |
|
assert isinstance(results, dict) |
|
assert len(results) == 0 |
|
|
|
results = PersistentStore.disk_prune( |
|
namespace=('t01', 'invalid', '-garbag!'), |
|
path=str(tmpdir), expires=0, action=True) |
|
|
|
# only t01 would be cleaned now |
|
assert isinstance(results, dict) |
|
assert len(results) == 1 |
|
assert len(results['t01']) == 2 |
|
|
|
# A second call will yield no results because the content has |
|
# already been cleaned up |
|
results = PersistentStore.disk_prune( |
|
namespace='t01', |
|
path=str(tmpdir), expires=0, action=True) |
|
assert isinstance(results, dict) |
|
assert len(results) == 0 |
|
|
|
# t02 is still untouched |
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t02', mode=PersistentStoreMode.FLUSH) |
|
# Nothing is pruned |
|
assert pc.get('key-t02') == 'value' |
|
assert pc.read() == b'data-t02' |
|
|
|
# t01 of course... it's gone |
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t01', mode=PersistentStoreMode.FLUSH) |
|
# Nothing is pruned |
|
assert pc.get('key-t01') is None |
|
assert pc.read() is None |
|
|
|
with pytest.raises(AttributeError): |
|
# provide garbage in namespace field and we're going to have a problem |
|
PersistentStore.disk_prune( |
|
namespace=object, path=str(tmpdir), expires=0, action=True) |
|
|
|
# Error Handling |
|
with mock.patch('os.path.getmtime', side_effect=FileNotFoundError()): |
|
results = PersistentStore.disk_prune( |
|
namespace='t02', path=str(tmpdir), expires=0, action=True) |
|
assert isinstance(results, dict) |
|
assert len(results) == 1 |
|
assert len(results['t02']) == 0 |
|
|
|
# no files were removed, so our data is still accessible |
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t02', mode=PersistentStoreMode.FLUSH) |
|
# Nothing is pruned |
|
assert pc.get('key-t02') == 'value' |
|
assert pc.read() == b'data-t02' |
|
|
|
with mock.patch('os.path.getmtime', side_effect=OSError()): |
|
results = PersistentStore.disk_prune( |
|
namespace='t02', path=str(tmpdir), expires=0, action=True) |
|
assert isinstance(results, dict) |
|
assert len(results) == 1 |
|
assert len(results['t02']) == 0 |
|
|
|
# no files were removed, so our data is still accessible |
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t02', mode=PersistentStoreMode.FLUSH) |
|
# Nothing is pruned |
|
assert pc.get('key-t02') == 'value' |
|
assert pc.read() == b'data-t02' |
|
|
|
with mock.patch('os.unlink', side_effect=FileNotFoundError()): |
|
results = PersistentStore.disk_prune( |
|
namespace='t02', path=str(tmpdir), expires=0, action=True) |
|
assert isinstance(results, dict) |
|
assert len(results) == 1 |
|
assert len(results['t02']) == 2 |
|
|
|
# no files were removed, so our data is still accessible |
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t02', mode=PersistentStoreMode.FLUSH) |
|
# Nothing is pruned |
|
assert pc.get('key-t02') == 'value' |
|
assert pc.read() == b'data-t02' |
|
|
|
with mock.patch('os.unlink', side_effect=OSError()): |
|
results = PersistentStore.disk_prune( |
|
namespace='t02', path=str(tmpdir), expires=0, action=True) |
|
assert isinstance(results, dict) |
|
assert len(results) == 1 |
|
assert len(results['t02']) == 2 |
|
|
|
# no files were removed, so our data is still accessible |
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t02', mode=PersistentStoreMode.FLUSH) |
|
# Nothing is pruned |
|
assert pc.get('key-t02') == 'value' |
|
assert pc.read() == b'data-t02' |
|
|
|
with mock.patch('os.rmdir', side_effect=OSError()): |
|
results = PersistentStore.disk_prune( |
|
namespace='t02', path=str(tmpdir), expires=0, action=True) |
|
assert isinstance(results, dict) |
|
assert len(results) == 1 |
|
assert len(results['t02']) == 2 |
|
|
|
# no files were removed, so our data is still accessible |
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t02', mode=PersistentStoreMode.FLUSH) |
|
# Nothing is pruned |
|
assert pc.get('key-t02') is None |
|
assert pc.read() is None |
|
|
|
|
|
def test_persistent_storage_disk_changes(tmpdir): |
|
""" |
|
General testing of a Persistent Store with underlining disk changes |
|
""" |
|
|
|
# Create a garbage file in place of where the namespace should be |
|
tmpdir.join('t01').write('0' * 1024) |
|
|
|
# Persistent Storage Initialization where namespace directory now is |
|
# already occupied by a filename |
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t01', mode=PersistentStoreMode.FLUSH) |
|
|
|
# Store some data and note that it isn't possible |
|
assert pc.write(b'data-t01') is False |
|
# We actually fell back to memory mode: |
|
assert pc.mode == PersistentStoreMode.MEMORY |
|
|
|
# Set's work |
|
assert pc.set('key-t01', 'value') |
|
|
|
# But upon reinitializtion (enforcing memory mode check) we will not have |
|
# the data available to us |
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t01', mode=PersistentStoreMode.FLUSH) |
|
|
|
assert pc.get('key-t01') is None |
|
|
|
# |
|
# Test situation where the file structure changed after initialization |
|
# |
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t02', mode=PersistentStoreMode.FLUSH) |
|
# Our mode stuck as t02 initialized correctly |
|
assert pc.mode == PersistentStoreMode.FLUSH |
|
assert os.path.isdir(pc.path) |
|
|
|
shutil.rmtree(pc.path) |
|
assert not os.path.isdir(pc.path) |
|
assert pc.set('key-t02', 'value') |
|
# The directory got re-created |
|
assert os.path.isdir(pc.path) |
|
|
|
# Same test but flag set to AUTO |
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t02', mode=PersistentStoreMode.AUTO) |
|
# Our mode stuck as t02 initialized correctly |
|
assert pc.mode == PersistentStoreMode.AUTO |
|
assert os.path.isdir(pc.path) |
|
|
|
shutil.rmtree(pc.path) |
|
assert not os.path.isdir(pc.path) |
|
assert pc.set('key-t02', 'value') |
|
# The directory is not recreated because of auto; it will occur on save |
|
assert not os.path.isdir(pc.path) |
|
path = pc.path |
|
del pc |
|
# It exists now |
|
assert os.path.isdir(path) |
|
|
|
pc = PersistentStore( |
|
path=str(tmpdir), namespace='t02', mode=PersistentStoreMode.FLUSH) |
|
# Content was not lost |
|
assert pc.get('key-t02') == 'value' |
|
|
|
# We'll remove a sub directory of it this time |
|
shutil.rmtree(os.path.join(pc.path, pc.temp_dir)) |
|
|
|
# We will still successfully write our data |
|
assert pc.write(b'data-t02') is True |
|
assert os.path.isdir(pc.path) |
|
|
|
shutil.rmtree(pc.path) |
|
assert not os.path.isdir(pc.path) |
|
assert pc.set('key-t01', 'value')
|
|
|