Dynamodb 入门学习

DynamoDB 本地部署

准备工作

安装及部署

  1. 将下载的 DynamoDB 压缩包文件 解压到 自定义文件下

  2. 在 自定义文件夹下 (DynamoDBLocal.jar 同级目录内) 打开 命令行窗口

  3. 输入以下命令,启动 DynamoDB

    1
    java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -sharedDb
  4. 配置凭证用于工作使用,凭证内容可以自定义

    1
    2
    3
    4
    AWS Access Key ID [None]: *********************
    AWS Secret Access Key [None]: *****************
    Default region name [None]: us-west-2
    Default output format [None]: json

Python 操作 DynamoDB

准备工作

  • pip 安装 外置包 boto3 操作 DynamoDB

    pip install boto3

连接 DynamoDB 数据库

  1. 本地连接

    1
    dynamodb = boto3.resource('ddynamodb', endpoint_url="http://localhost:8000")
  1. 使用线上 Web 服务 (region_name: 表示使用服务器区域)

    1
    dynamodb = boto3.resource('dynamodb', region_name='us-west-2')

创建表操作: create_table()

  • create_table()

    • 调用时,需要指定表名称、主键属性及其数据类型
    • KeySchema : 设置分区键与排序键
    • ProvisionedThroughput : 必须参数,定义读取与写入单位
    • AttributeDefinitions : 设置分区主键与排序主键内容类型
  • 创表实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    table = dynamodb.create_table(
    TableName='Movies',
    KeySchema=[
    # 设置分区键
    {
    'AttributeName': 'year',
    'KeyType': 'HASH'
    },
    # 设置排序键
    {
    'AttributeName': 'title',
    'KeyType': 'RANGE'
    }
    ],
    AttributeDefinitions=[
    # 设置分区键 year 的数据类型 为 Number
    {
    'AttributeName': 'year',
    'AttributeType': 'N'
    },
    # 设置排序键 title 的数据类型为 String
    {
    'AttributeName': 'title',
    'AttributeType': 'S'
    }
    ],
    # 定义读取与写入容量单位
    ProvisionedThroughput={
    'ReadCapacityUnits': 10,
    'WriteCapacityUnits': 10
    }
    )

将 JSON 数据加载到 表内: put_item()

  • 加载实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    def load_movies(movies, dynamodb=None):
    if not dynamodb:
    dynamodb = boto3.resource('dynamodb', endpoint_url="http://localhost:8000")

    table = dynamodb.Table('Movies')
    for movie in movies:
    # 读取并显示数据到面板
    year = int(movie['year'])
    title = movie['title']
    print("Adding movie:", year, title)
    # 加载数据
    table.put_item(Item=movie)

    if __name__ == '__main__':
    with open("moviedata.json") as json_file:
    movie_list = json.load(json_file, parse_float=Decimal)
    load_movies(movie_list)

CRUD 操作

向已存在表内创建新项目 put_item()
  • 创建实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # 创建新项目
    def put_movie(title, year, plot, rating, dynamodb=None):
    if not dynamodb:
    dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')

    table = dynamodb.Table('Movies')
    response = table.put_item(
    Item={
    'year': year,
    'title': title,
    'info': {
    'plot': plot,
    'rating': rating
    }
    }
    )
    return response

    if __name__ == '__main__':
    movie_resp = put_movie('The Big New Movie', 2015, 'Nothing happens at all.', 0)
    print('Put movie succeeded:')
    pprint(movie_resp)
指定主键值读取项目: get_item()
  • get_item(Key={'分区键名': data1, '排序键名: data2})

  • 读取实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    import boto3
    from pprint import pprint
    from botocore.exceptions import ClientError

    # 读取数据
    def get_movie(title, year, dynamodb=None):
    if not dynamodb:
    dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')

    table = dynamodb.Table('Movies')

    try:
    response = table.get_item(Key={'year': year, 'title': title})
    except ClientError as e:
    print(e.response['Error']['Message'])
    else:
    return response['Item']

    if __name__ == '__main__':
    movie = get_movie('The Big New Movie', 2015,)
    if movie:
    print('Get movie succeeded:')
    pprint(movie)
    # python 3.6 没有 sort_dicts 形参
    # pprint(movie, sort_dicts=False)
更新项目 (更新现有属性的值、添加新属性或删除属性): update_item()
  • update_item()

    • 更改现有属性的值
    • 向现有项目内添加新的列表属性
  • 注意事项

    • DecimalEncoder 类用于打印使用 Decimal 类存储的数字。BotoSDK 使用Decimal类来保存 Amazon DynamoDB 数字值。
    • 使用 UpdateExpression 来描述您要对指定项目执行的所有更新
    • ReturnValues参数用于指示 DynamoDB 仅返回更新后的属性 (UPDATED_NEW)
  • 更新实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    import boto3
    from decimal import Decimal
    from pprint import pprint

    def update_movie(title, year, rating, plot, actors, dynamodb=None):
    if not dynamodb:
    dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')

    table = dynamodb.Table('Movies')

    response = table.update_item(
    Key={
    'year': year,
    'title': title
    },
    # UpdateExpression 来描述您要对指定项目执行的所有更新。
    # 对更新内容设置
    UpdateExpression='set info.rating=:r, info.plot=:p, info.actors=:a',
    ExpressionAttributeValues={
    ':r': Decimal(rating),
    ':p': plot,
    ':a': actors
    },
    # ReturnValues参数用于指示 DynamoDB 仅返回更新后的属性 (UPDATED_NEW)。
    ReturnValues="UPDATED_NEW"
    )
    return response

    if __name__ == '__main__':
    update_response = update_movie(
    'The Big New Movie', 2015, 5.5, 'Everything happens all at once.',
    ['larry', 'Moe', 'Curly']
    )
    print("Ypdate movie succeeded:")
    pprint(update_response)
增加原子计数器: update_item()
  • update_item()

    • 递增或递减现有属性的值而不干扰其他写入请求
  • 原子计数器实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import boto3
    from decimal import Decimal
    from pprint import pprint

    # 增加原子计数器:使用update_item方法递增或递减现有属性的值而不干扰其他写入请求
    def increase_rating(title, year, rating_increase, dynamodb=None):
    if not dynamodb:
    dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')

    table = dynamodb.Table('Movies')
    response = table.update_item(
    Key={
    'year': year,
    'title': title
    },
    UpdateExpression="set info.rating = info.rating + :val",
    ExpressionAttributeValues={
    ':val': Decimal(rating_increase)
    },
    ReturnValues='UPDATED_NEW'
    )
    return response

    if __name__ == '__main__':
    update_response = increase_rating('The Big New Movie', 2015, 1)
    print('Update movie succeeded:')
    pprint(update_response)
满足指定条件更新项目: update_item()
  • 注意事项

    • ExpressionAttributeValues 提供值替换功能。使用此参数是因为您不能在任何表达式 (包括 KeyConditionExpression) 中使用文本。您可使用表达式属性值 :yyyy 来解决此问题。
  • 更新实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    import boto3
    from pprint import pprint
    from botocore.exceptions import ClientError

    # 有条件更新项目 (演员数目)
    def remove_actors(title, year, actor_count, dynamodb=None):
    if not dynamodb:
    dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')

    table = dynamodb.Table('Movies')
    try:
    response = table.update_item(
    Key={
    'year': year,
    'title': title
    },
    UpdateExpression='remove info.actors[0]',
    ConditionExpression="size(info.actors) >= :num",
    ExpressionAttributeValues={':num': actor_count},
    ReturnValues="UPDATED_NEW"
    )
    except ClientError as e:
    if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
    print(e.response["Error"]["Message"])
    else:
    raise
    else:
    return response

    if __name__ == '__main__':
    print("Attempting conditional update (expecting failure) ...")
    update_response = remove_actors('The Big New Movie', 2015, 3)
    if update_response:
    print('Update movie succeeded:')
    pprint(update_response)
删除项目: delete_item()
  • delete_item()

    • 通过指定项目主键删除项目
  • 删除实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    import boto3
    from pprint import pprint
    from decimal import Decimal
    from botocore.exceptions import ClientError

    # 删除项目 delete_item 通过指定主键删除
    def delete_underrated_movie(title, year, rating, dynamodb=None):
    if not dynamodb:
    dynamodb = boto3.resource('dynamodb', endpoint_url="http://localhost:8000")

    table = dynamodb.Table('Movies')

    try:
    response = table.delete_item(
    Key={
    'year': year,
    'title': title
    },
    ConditionExpression="info.rating >= :val",
    ExpressionAttributeValues={
    ":val": Decimal(rating)
    }
    )
    except ClientError as e:
    if e.response['Error']['Code'] == "ConditionalCheckFailedException":
    print(e.response["Error"]["Message"])
    else:
    raise
    else:
    return response

    if __name__ == '__main__':
    print("Attempting a conditional delete ...")
    delete_response = delete_underrated_movie('The Big New Movie', 2015, 5)
    if delete_response:
    print('Delete movie succeeded:')
    pprint(delete_response)

查询和扫描数据

  • query() : 检索表中数据,分区键值必选,排序键可选
  • scan() : 检索表中所有数据
单条件查询: query()
  • 注意事项

    • 使用从 ConditionExpression 导入的 KeyAttr 函数时,Boto 3 开发工具包会为您构建一个 boto3.dynamodb.conditions。您还可以字符串形式指定 ConditionExpression
  • 查询实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    import boto3
    from boto3.dynamodb.conditions import Key

    # 根据分区键 query 查找 同一分区内数据
    # 检索 year 1985 发行的所有电影。
    def query_movies(year, dynamodb=None):
    if not dynamodb:
    dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')

    table = dynamodb.Table('Movies')

    response = table.query(
    KeyConditionExpression=Key('year').eq(year)
    )
    return response['Items']

    if __name__ == '__main__':
    query_year = 1985
    print(f'Movies from {query_year}')
    movies = query_movies(query_year)
    for movie in movies:
    print(movie['year'], ':', movie['title'])
多条件混合查询
  • 查询实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    import boto3
    from pprint import pprint
    from boto3.dynamodb.conditions import Key

    # 多条件查询
    # 将检索 year 1992 发行并且 title 以字母“A”至字母“L”开头的所有电影
    def query_and_project_movies(year, title_range, dynamodb=None):
    if not dynamodb:
    dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')

    table = dynamodb.Table('Movies')
    print(f'获取年份、标题、类型和主角')

    # 表达式属性名称只能引用投影表达式中的项。
    response = table.query(
    ProjectionExpression="#yr, title, info.genres, info.actors[0]",
    ExpressionAttributeNames={'#yr': 'year'},
    KeyConditionExpression=
    Key('year').eq(year) & Key('title').between(title_range[0], title_range[1])
    )
    return response['Items']

    if __name__ == '__main__':
    query_year = 1992
    query_range = ('A', 'L')
    print(f'Get movies from {query_year} with titles from {query_range[0]} to {query_range[1]}')
    movies = query_and_project_movies(query_year, query_range)
    for movie in movies:
    print(f'\n{movie["year"]} : {movie["title"]}')
    pprint(movie['info'])
扫描数据: scan()
  • scan()

    • 读取整个表中的所有项目,并返回表中的所有数据
    • 提供一个可选的 filter_expression,以便仅返回符合条件的项目。但是,筛选条件仅在扫描整个表后应用。
  • 注意事项

    • FilterExpression 用于指定一个条件,以便仅返回符合条件的项目。所有其他项目都将被舍弃
    • ProjectionExpression 用于指定要在扫描结果中包含的属性。
    • 响应中的 LastEvaluatedKey 值随后通过 scan 参数传递给 ExclusiveStartKey 方法。当返回最后一页后,LastEvaluatedKey 将不是响应的一部分。
    • ExpressionAttributeNames 提供名称替换功能。我们使用这个是因为year是 DynamoDB 中的保留字您不能直接在任何表达式中使用它,包括KeyConditionExpression。您可使用表达式属性名称 #yr 来解决此问题
  • 扫描实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    import boto3
    from pprint import pprint
    from boto3.dynamodb.conditions import Key

    # scan 方法将读取整个表中的所有项目,并返回表中的所有数据。
    # 可提供一个可选的 filter_expression,以便仅返回符合条件的项目
    # 筛选条件仅在扫描整个表后应用, 不适合单纯的查找操作

    # 扫描整个 Movies 表,表中包含约 5000 个项目。
    # 扫描时可指定可选筛选条件,以便仅检索 20 世纪 50 年代以来的电影 (约 100 个项目),同时舍弃所有其他项目
    def scan_movies(year_range, display_movies, dynamodb=None):
    if not dynamodb:
    dynamodb = boto3.resource('dynamodb', endpoint_url='http://localhost:8000')

    table = dynamodb.Table("Movies")
    scan_kwargs = {
    # 指定一个条件,以便仅返回符合条件的项目。所有其他项目都将被舍弃。
    'FilterExpression': Key('year').between(*year_range),
    # 指定要在扫描结果中包含的属性
    'ProjectionExpression': "#yr, title, info.rating",
    # ExpressionAttributeNames 提供名称替换功能。我们使用这个是因为year是 DynamoDB 中的保留字您不能直接在任何表达式中使用它,
    # 包括KeyConditionExpression。您可使用表达式属性名称 #yr 来解决此问题。
    'ExpressionAttributeNames': {'#yr': "year"}
    }

    done = False
    start_key = None
    while not done:
    if start_key:
    scan_kwargs['ExclusiveStartKey'] = start_key
    response = table.scan(**scan_kwargs)
    # 调用外部的 print_movies 进行输出
    display_movies(response.get('Items', []))
    start_key = response.get('LastEvaluatedKey', None)
    done = start_key is None

    if __name__ == '__main__':
    def print_movies(movies):
    for movie in movies:
    print(f'\n{movie["year"]} : {movie["title"]}')
    pprint(movie['info'])

    query_range = (1950, 1959)
    print(f"Scanning for movies released from {query_range[0]} to {query_range[1]}...")
    scan_movies(query_range, print_movies)

删除表

删除已创建的表: delete()
  • 删除实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    import boto3

    def delete_movie_table(dynamodb=None):
    if not dynamodb:
    dynamodb = boto3.resource('dynamodb', endpoint_url="http://localhost:8000")

    table = dynamodb.Table('Movies')
    table.delete()

    if __name__ == '__main__':
    delete_movie_table()
    print('Movies table deleted.')