Q&A
現在複数のECサイトをscrapyでスクレイピングし、取得したデータをdynamoDBに登録するプログラムを開発しています。
この時、取得した商品情報をproduct_tableに、店舗情報をstore_tableに格納しようと思っています。
そこでスクレイピングしたデータごとにテーブルを切り替えたいのですが、自力で調べてもその方法が分からない状態です。
Scrapyで取得したデータをdynamoDBの複数のテーブルに登録する方法を、ご教授いただけないでしょうか?
下記にscrapyのsettings.pyとpipelines.pyのコードを載せます。
pipelines.pyのコードは以下のものをお借りさせていただきました。
https://github.com/acordiner/scrapy-dynamodb
python
1ITEM_PIPELINES = { 2 'myproject.pipelines.DynamoDBPipeline': 300, 3} 4 5AWS_ACCESS_KEY_ID = 'AWS_ACEESS_KEY_ID' 6AWS_SECRET_ACCESS_KEY = 'AWS_SECRET_ACCESS_KEY' 7DYNAMODB_REGION_NAME = 'ap-northeast-1' 8DYNAMODB_TABLE_NAME = 'product_table' 9DYNAMODB_ENDPOINT_URL = 'http://192.168.33.10:8000'
python
1# -*- coding: utf-8 -*- 2 3# Define your item pipelines here 4# 5# Don't forget to add your pipeline to the ITEM_PIPELINES setting 6# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html 7 8import boto3 9 10def default_encoder(value, empty_str_replacement = 'empty'): 11 import datetime 12 # For datetime 13 if isinstance(value, datetime.datetime): 14 return value.strftime('%Y-%m-%d %H:%M:%S') 15 elif isinstance(value, datetime.date): 16 return value.strftime('%Y-%m-%d') 17 elif isinstance(value, datetime.time): 18 return value.strftime('%H:%M:%S') 19 else: 20 # For empty str 21 if isinstance(value, str): 22 if value == '': 23 value = empty_str_replacement 24 return value 25 26 27class DynamoDBPipeline(object): 28 29 def __init__(self, crawler, encoder=default_encoder): 30 self.encoder = encoder 31 self.aws_access_key_id = crawler.settings['AWS_ACCESS_KEY_ID'] 32 self.aws_secret_access_key = crawler.settings['AWS_SECRET_ACCESS_KEY'] 33 self.region_name = crawler.settings['DYNAMODB_REGION_NAME'] 34 self.endpoint_url = crawler.settings['DYNAMODB_ENDPOINT_URL'] 35 self.table_name = crawler.settings['DYNAMODB_TABLE_NAME'] 36 self.table = None 37 38 @classmethod 39 def from_crawler(cls, crawler): 40 return cls(crawler) 41 42 def open_spider(self, spider): 43 db = boto3.resource( 44 'dynamodb', 45 aws_access_key_id=self.aws_access_key_id, 46 aws_secret_access_key=self.aws_secret_access_key, 47 region_name=self.region_name, 48 endpoint_url=self.endpoint_url 49 ) 50 self.table = db.Table(self.table_name) 51 52 def close_spider(self, spider): 53 self.table = None 54 55 def process_item(self, item, spider): 56 self.table.put_item( 57 TableName=self.table_name, 58 Item={k: self.encoder(v) for k, v in item.items()}, 59 # TODO: add condintion extension, example is below 60 # ExpressionAttributeNames={"#name" : "name",}, 61 # ConditionExpression='attribute_not_exists(#name) AND attribute_not_exists(category)', 62 ) 63 return item 64