Moto

import pytest

from moto import mock_s3


@pytest.fixture(scope="function")

def aws_credentials():

    """Mocked AWS Credentials for moto."""

    os.environ["AWS_ACCESS_KEY_ID"] = "testing"

    os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"

    os.environ["AWS_SECURITY_TOKEN"] = "testing"

    os.environ["AWS_SESSION_TOKEN"] = "testing"

    os.environ["AWS_DEFAULT_REGION"] = "us-east-1"



@pytest.fixture(scope="function")

def s3(aws_credentials):

    with mock_s3():

        yield boto3.client("s3", region_name="us-east-1")



def test_create_bucket(s3):

    bucket_name = "test-bucket"

    s3.create_bucket(Bucket=bucket_name)

    actual = s3.list_buckets()

    assert len(actual['Buckets']) == 1

    assert actual['Buckets'][0]['Name'] == bucket_name

s3fs moto testing

import asyncio

import errno

import datetime

from contextlib import contextmanager

import json

from concurrent.futures import ProcessPoolExecutor

import io

import os

import random

import requests

import time

import sys

import pytest

import moto

from itertools import chain

import fsspec.core

from dateutil.tz import tzutc


import s3fs.core

from s3fs.core import S3FileSystem

from s3fs.utils import ignoring, SSEParams

from botocore.exceptions import NoCredentialsError

from fsspec.asyn import sync

from fsspec.callbacks import Callback

from packaging import version


test_bucket_name = "test"

secure_bucket_name = "test-secure"

versioned_bucket_name = "test-versioned"

files = {

    "test/accounts.1.json": (

        b'{"amount": 100, "name": "Alice"}\n'

        b'{"amount": 200, "name": "Bob"}\n'

        b'{"amount": 300, "name": "Charlie"}\n'

        b'{"amount": 400, "name": "Dennis"}\n'

    ),

    "test/accounts.2.json": (

        b'{"amount": 500, "name": "Alice"}\n'

        b'{"amount": 600, "name": "Bob"}\n'

        b'{"amount": 700, "name": "Charlie"}\n'

        b'{"amount": 800, "name": "Dennis"}\n'

    ),

}


csv_files = {

    "2014-01-01.csv": (

        b"name,amount,id\n" b"Alice,100,1\n" b"Bob,200,2\n" b"Charlie,300,3\n"

    ),

    "2014-01-02.csv": (b"name,amount,id\n"),

    "2014-01-03.csv": (

        b"name,amount,id\n" b"Dennis,400,4\n" b"Edith,500,5\n" b"Frank,600,6\n"

    ),

}

text_files = {

    "nested/file1": b"hello\n",

    "nested/file2": b"world",

    "nested/nested2/file1": b"hello\n",

    "nested/nested2/file2": b"world",

}

glob_files = {"file.dat": b"", "filexdat": b""}

a = test_bucket_name + "/tmp/test/a"

b = test_bucket_name + "/tmp/test/b"

c = test_bucket_name + "/tmp/test/c"

d = test_bucket_name + "/tmp/test/d"

port = 5555

endpoint_uri = "http://127.0.0.1:%s/" % port



@pytest.fixture()

def s3_base():

    # writable local S3 system

    import shlex

    import subprocess


    try:

        # should fail since we didn't start server yet

        r = requests.get(endpoint_uri)

    except:

        pass

    else:

        if r.ok:

            raise RuntimeError("moto server already up")

    if "AWS_SECRET_ACCESS_KEY" not in os.environ:

        os.environ["AWS_SECRET_ACCESS_KEY"] = "foo"

    if "AWS_ACCESS_KEY_ID" not in os.environ:

        os.environ["AWS_ACCESS_KEY_ID"] = "foo"

    proc = subprocess.Popen(

        shlex.split("moto_server s3 -p %s" % port),

        stderr=subprocess.DEVNULL,

        stdout=subprocess.DEVNULL,

        stdin=subprocess.DEVNULL,

    )


    timeout = 5

    while timeout > 0:

        try:

            print("polling for moto server")

            r = requests.get(endpoint_uri)

            if r.ok:

                break

        except:

            pass

        timeout -= 0.1

        time.sleep(0.1)

    print("server up")

    yield

    print("moto done")

    proc.terminate()

    proc.wait()



def get_boto3_client():

    from botocore.session import Session


    # NB: we use the sync botocore client for setup

    session = Session()

    return session.create_client("s3", endpoint_url=endpoint_uri)



@pytest.fixture()

def s3(s3_base):

    client = get_boto3_client()

    client.create_bucket(Bucket=test_bucket_name, ACL="public-read")


    client.create_bucket(Bucket=versioned_bucket_name, ACL="public-read")

    client.put_bucket_versioning(

        Bucket=versioned_bucket_name, VersioningConfiguration={"Status": "Enabled"}

    )


    # initialize secure bucket

    client.create_bucket(Bucket=secure_bucket_name, ACL="public-read")

    policy = json.dumps(

        {

            "Version": "2012-10-17",

            "Id": "PutObjPolicy",

            "Statement": [

                {

                    "Sid": "DenyUnEncryptedObjectUploads",

                    "Effect": "Deny",

                    "Principal": "*",

                    "Action": "s3:PutObject",

                    "Resource": "arn:aws:s3:::{bucket_name}/*".format(

                        bucket_name=secure_bucket_name

                    ),

                    "Condition": {

                        "StringNotEquals": {

                            "s3:x-amz-server-side-encryption": "aws:kms"

                        }

                    },

                }

            ],

        }

    )

    client.put_bucket_policy(Bucket=secure_bucket_name, Policy=policy)

    for flist in [files, csv_files, text_files, glob_files]:

        for f, data in flist.items():

            client.put_object(Bucket=test_bucket_name, Key=f, Body=data)


    S3FileSystem.clear_instance_cache()

    s3 = S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri})

    s3.invalidate_cache()

    yield s3


def test_simple(s3):

    data = b"a" * (10 * 2**20)


    with s3.open(a, "wb") as f:

        f.write(data)


    with s3.open(a, "rb") as f:

        out = f.read(len(data))

        assert len(data) == len(out)

        assert out == data

import boto3

import s3fs


from moto.server import ThreadedMotoServer


server = ThreadedMotoServer()

server.start()


s3 = boto3.client("s3", endpoint_url="http://localhost:5000")

s3.create_bucket(Bucket="test")


data = b"a" * (10 * 2**20)


fs = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": "http://localhost:5000"})

# Write and read data

with fs.open("test/a", "wb") as f:

    f.write(data)

with fs.open("test/a", "rb") as f:

    out = f.read()

assert out == data

# Create empty file

fs.touch("test/b")

import boto3

import os

import pytest

import s3fs


from moto.server import ThreadedMotoServer



os.environ["AWS_ACCESS_KEY_ID"] = "testing"

os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"

os.environ["AWS_SECURITY_TOKEN"] = "testing"

os.environ["AWS_SESSION_TOKEN"] = "testing"

os.environ["AWS_DEFAULT_REGION"] = "us-east-1"


@pytest.fixture

def fs() -> s3fs.S3FileSystem:

    server = ThreadedMotoServer()

    server.start()

    s3 = boto3.client("s3", endpoint_url="http://localhost:5000")

    s3.create_bucket(Bucket="test")

    fs = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": "http://localhost:5000"})

    return fs



def test_s3_touch_a(fs):

    fs.touch("test/a")

    assert fs.ls("test") == ["test/a"]

    fs.rm("test/a")



def test_s3_touch_b(fs):

    fs.touch("test/b")

    assert fs.ls("test") == ["test/b"]

    fs.rm("test/b")

import boto3

import os

import pytest

import s3fs


from moto.server import ThreadedMotoServer


os.environ["AWS_ACCESS_KEY_ID"] = "testing"

os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"

os.environ["AWS_SECURITY_TOKEN"] = "testing"

os.environ["AWS_SESSION_TOKEN"] = "testing"

os.environ["AWS_DEFAULT_REGION"] = "us-east-1"


server = ThreadedMotoServer(port=5000)

server.start()

s3 = boto3.client("s3", endpoint_url="http://localhost:5000")

s3.create_bucket(Bucket="test")

fs = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": "http://localhost:5000"})


fs.touch("test/a")

assert fs.ls("test") == ["test/a"]

fs.rm("test/a")


fs.touch("test/b")

assert fs.ls("test") == ["test/b"]

fs.rm("test/b")


server.stop()

$ moto_server dynamodb


import boto3


# Create a client with the DynamoDB service

dynamodb = boto3.client('dynamodb', endpoint_url='http://localhost:5000')


# Create a table

dynamodb.create_table(

    TableName='my_table',

    KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],

    AttributeDefinitions=[{'AttributeName': 'id', 'AttributeType': 'S'}],

)


# Create a table

dynamodb.create_table(

    TableName='my_table',

    KeySchema=[{'AttributeName': 'id', 'KeyType': 'HASH'}],

    AttributeDefinitions=[{'AttributeName': 'id', 'AttributeType': 'S'}],

    BillingMode='PAY_PER_REQUEST'

)


# Put an item into the table

dynamodb.put_item(

    TableName='my_table',

    Item={'id': {'S': '1'}, 'name': {'S': 'John Doe'}}

)


# Get an item from the table

response = dynamodb.get_item(

    TableName='my_table',

    Key={'id': {'S': '1'}}

)


print(response['Item'])