Moto
import pytest
from moto import mock_s3
@pytest.fixture(scope="function")
def aws_credentials():
"""Mocked AWS Credentials for moto."""
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
@pytest.fixture(scope="function")
def s3(aws_credentials):
with mock_s3():
yield boto3.client("s3", region_name="us-east-1")
def test_create_bucket(s3):
bucket_name = "test-bucket"
s3.create_bucket(Bucket=bucket_name)
actual = s3.list_buckets()
assert len(actual['Buckets']) == 1
assert actual['Buckets'][0]['Name'] == bucket_name
s3fs moto testing
import asyncio
import errno
import datetime
from contextlib import contextmanager
import json
from concurrent.futures import ProcessPoolExecutor
import io
import os
import random
import requests
import time
import sys
import pytest
import moto
from itertools import chain
import fsspec.core
from dateutil.tz import tzutc
import s3fs.core
from s3fs.core import S3FileSystem
from s3fs.utils import ignoring, SSEParams
from botocore.exceptions import NoCredentialsError
from fsspec.asyn import sync
from fsspec.callbacks import Callback
from packaging import version
test_bucket_name = "test"
secure_bucket_name = "test-secure"
versioned_bucket_name = "test-versioned"
files = {
"test/accounts.1.json": (
b'{"amount": 100, "name": "Alice"}\n'
b'{"amount": 200, "name": "Bob"}\n'
b'{"amount": 300, "name": "Charlie"}\n'
b'{"amount": 400, "name": "Dennis"}\n'
),
"test/accounts.2.json": (
b'{"amount": 500, "name": "Alice"}\n'
b'{"amount": 600, "name": "Bob"}\n'
b'{"amount": 700, "name": "Charlie"}\n'
b'{"amount": 800, "name": "Dennis"}\n'
),
}
csv_files = {
"2014-01-01.csv": (
b"name,amount,id\n" b"Alice,100,1\n" b"Bob,200,2\n" b"Charlie,300,3\n"
),
"2014-01-02.csv": (b"name,amount,id\n"),
"2014-01-03.csv": (
b"name,amount,id\n" b"Dennis,400,4\n" b"Edith,500,5\n" b"Frank,600,6\n"
),
}
text_files = {
"nested/file1": b"hello\n",
"nested/file2": b"world",
"nested/nested2/file1": b"hello\n",
"nested/nested2/file2": b"world",
}
glob_files = {"file.dat": b"", "filexdat": b""}
a = test_bucket_name + "/tmp/test/a"
b = test_bucket_name + "/tmp/test/b"
c = test_bucket_name + "/tmp/test/c"
d = test_bucket_name + "/tmp/test/d"
port = 5555
endpoint_uri = "http://127.0.0.1:%s/" % port
@pytest.fixture()
def s3_base():
# writable local S3 system
import shlex
import subprocess
try:
# should fail since we didn't start server yet
r = requests.get(endpoint_uri)
except:
pass
else:
if r.ok:
raise RuntimeError("moto server already up")
if "AWS_SECRET_ACCESS_KEY" not in os.environ:
os.environ["AWS_SECRET_ACCESS_KEY"] = "foo"
if "AWS_ACCESS_KEY_ID" not in os.environ:
os.environ["AWS_ACCESS_KEY_ID"] = "foo"
proc = subprocess.Popen(
shlex.split("moto_server s3 -p %s" % port),
stderr=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
)
timeout = 5
while timeout > 0:
try:
print("polling for moto server")
r = requests.get(endpoint_uri)
if r.ok:
break
except:
pass
timeout -= 0.1
time.sleep(0.1)
print("server up")
yield
print("moto done")
proc.terminate()
proc.wait()
def get_boto3_client():
from botocore.session import Session
# NB: we use the sync botocore client for setup
session = Session()
return session.create_client("s3", endpoint_url=endpoint_uri)
@pytest.fixture()
def s3(s3_base):
client = get_boto3_client()
client.create_bucket(Bucket=test_bucket_name, ACL="public-read")
client.create_bucket(Bucket=versioned_bucket_name, ACL="public-read")
client.put_bucket_versioning(
Bucket=versioned_bucket_name, VersioningConfiguration={"Status": "Enabled"}
)
# initialize secure bucket
client.create_bucket(Bucket=secure_bucket_name, ACL="public-read")
policy = json.dumps(
{
"Version": "2012-10-17",
"Id": "PutObjPolicy",
"Statement": [
{
"Sid": "DenyUnEncryptedObjectUploads",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::{bucket_name}/*".format(
bucket_name=secure_bucket_name
),
"Condition": {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "aws:kms"
}
},
}
],
}
)
client.put_bucket_policy(Bucket=secure_bucket_name, Policy=policy)
for flist in [files, csv_files, text_files, glob_files]:
for f, data in flist.items():
client.put_object(Bucket=test_bucket_name, Key=f, Body=data)
S3FileSystem.clear_instance_cache()
s3 = S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri})
s3.invalidate_cache()
yield s3
def test_simple(s3):
data = b"a" * (10 * 2**20)
with s3.open(a, "wb") as f:
f.write(data)
with s3.open(a, "rb") as f:
out = f.read(len(data))
assert len(data) == len(out)
assert out == data
import boto3
import s3fs
from moto.server import ThreadedMotoServer
server = ThreadedMotoServer()
server.start()
s3 = boto3.client("s3", endpoint_url="http://localhost:5000")
s3.create_bucket(Bucket="test")
data = b"a" * (10 * 2**20)
fs = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": "http://localhost:5000"})
# Write and read data
with fs.open("test/a", "wb") as f:
f.write(data)
with fs.open("test/a", "rb") as f:
out = f.read()
assert out == data
# Create empty file
fs.touch("test/b")
import boto3
import os
import pytest
import s3fs
from moto.server import ThreadedMotoServer
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
@pytest.fixture
def fs() -> s3fs.S3FileSystem:
server = ThreadedMotoServer()
server.start()
s3 = boto3.client("s3", endpoint_url="http://localhost:5000")
s3.create_bucket(Bucket="test")
fs = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": "http://localhost:5000"})
return fs
def test_s3_touch_a(fs):
fs.touch("test/a")
assert fs.ls("test") == ["test/a"]
fs.rm("test/a")
def test_s3_touch_b(fs):
fs.touch("test/b")
assert fs.ls("test") == ["test/b"]
fs.rm("test/b")
import boto3
import os
import pytest
import s3fs
from moto.server import ThreadedMotoServer
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
server = ThreadedMotoServer(port=5000)
server.start()
s3 = boto3.client("s3", endpoint_url="http://localhost:5000")
s3.create_bucket(Bucket="test")
fs = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": "http://localhost:5000"})
fs.touch("test/a")
assert fs.ls("test") == ["test/a"]
fs.rm("test/a")
fs.touch("test/b")
assert fs.ls("test") == ["test/b"]
fs.rm("test/b")
server.stop()
$ moto_server dynamodb
import boto3
# Create a client with the DynamoDB service
dynamodb = boto3.client('dynamodb', endpoint_url='http://localhost:5000')
# Create a table
dynamodb.create_table(
TableName='my_table',
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'id', 'AttributeType': 'S'}],
)
# Create a table
dynamodb.create_table(
TableName='my_table',
KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'id', 'AttributeType': 'S'}],
BillingMode='PAY_PER_REQUEST'
)
# Put an item into the table
dynamodb.put_item(
TableName='my_table',
Item={'id': {'S': '1'}, 'name': {'S': 'John Doe'}}
)
# Get an item from the table
response = dynamodb.get_item(
TableName='my_table',
Key={'id': {'S': '1'}}
)
print(response['Item'])