Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
scrapy
scrapy
simplejson
2 changes: 0 additions & 2 deletions scrapy_streaming/commands/streaming.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import os

from scrapy.commands import ScrapyCommand
from scrapy.exceptions import UsageError

Expand Down
2 changes: 1 addition & 1 deletion scrapy_streaming/communication/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from scrapy_streaming.communication.map import CommunicationMap
from scrapy_streaming.communication.wrappers import *
from scrapy_streaming.communication.validators import *

Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@ class LineProcessProtocol(protocol.ProcessProtocol, object):
"""

def __init__(self):
self.__buffer = b''
self.__delimiter = b'\n'
self._buffer = b''
self._delimiter = b'\n'

def outReceived(self, data):
"""
Implement the outReceived method, buffering the incoming data and
dispatching line by line in the ``lineReceived`` method.
"""
self.__buffer += data
self._buffer += data

lines = self.__buffer.splitlines()
if data.endswith(self.__delimiter):
self.__buffer = b''
lines = self._buffer.splitlines()
if data.endswith(self._delimiter):
self._buffer = b''
else:
self.__buffer = lines.pop()
self._buffer = lines.pop()

for line in lines:
self.lineReceived(line)
Expand All @@ -40,5 +40,5 @@ def writeLine(self, data):
"""
data = to_bytes(data)
if not data.endswith(b'\n'):
data += self.__delimiter
data += self._delimiter
self.transport.write(data)
61 changes: 39 additions & 22 deletions scrapy_streaming/communication/map.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
import json
import base64

import simplejson as json

from scrapy.utils.python import to_unicode, to_native_str

from scrapy_streaming.communication import wrappers
from scrapy_streaming.utils import MessageError
from scrapy_streaming.communication import validators
from scrapy_streaming.utils import MessageError, extract_instance_fields


class CommunicationMap(object):
"""
Helper class to create the json messages
Helper class to create and receive json messages
"""

mapping = {
'spider': wrappers.SpiderMessage,
'request': wrappers.RequestMessage,
'log': wrappers.LogMessage
'spider': validators.SpiderMessage,
'request': validators.RequestMessage,
'from_response_request': validators.FromResponseRequestMessage,
'log': validators.LogMessage,
'close': validators.CloseMessage
}

@staticmethod
def parse(line):
"""
Receives a json string in a line, that will be decoded and parsed to a message
"""
try:
msg = json.loads(to_native_str(line))

Expand All @@ -28,10 +35,10 @@ def parse(line):
raise MessageError('"type" field not provided.')

msg_type = msg.pop('type')
try:
return CommunicationMap.mapping[msg_type].from_dict(msg)
except KeyError:
if msg_type not in CommunicationMap.mapping:
raise MessageError('%s is not a valid message type.' % msg_type)

return CommunicationMap.mapping[msg_type].from_dict(msg, line)
except ValueError:
raise MessageError('Received message is not a valid json.')

Expand All @@ -48,18 +55,28 @@ def error(message, details):
return json.dumps(fields)

@staticmethod
def response(resp, request_id='parse'):
fields = _extract_fields(resp, ['url', 'headers', 'status', 'body', 'meta', 'flags'])
fields['id'] = to_unicode(request_id)
def response(resp, encode64):
fields = extract_instance_fields(resp, ['url', 'headers', 'status', 'meta', 'flags'])
if encode64:
fields['body'] = base64.b64encode(resp.body)
else:
# validates if the body is text-like and serializable
try:
json.dumps(resp.body) # throws UnicodeDecodeError if not text-serializable
fields['body'] = resp.body
except UnicodeDecodeError:
raise ValueError('Response body is not serializable. If it\'s returning binary data, '
'set the "base64" to True to encode the data.')

fields['id'] = resp.meta['request_id']
fields['type'] = 'response'

return json.dumps(fields)

@staticmethod
def exception(line, exception):
fields = {'type': 'exception',
'received_message': to_unicode(line),
'exception': to_unicode(exception)}

def _extract_fields(item, fields):
"""
Given a list of fields, generate a dict with key being the name of the field
mapping to the serialized item.field value
"""
data = {}
for field in fields:
data[field] = json.loads(json.dumps(getattr(item, field)))
return data
return json.dumps(fields)
142 changes: 142 additions & 0 deletions scrapy_streaming/communication/validators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import logging

import six

from scrapy_streaming.utils import MessageError
from scrapy_streaming.utils.fields import EmptyField, RequiredField


class MessageValidator(object):
"""
This class is responsible for validating dicts keys and values.
"""

validator = {}
"""
Validators must be defined as:
key: name of the field
value: expected type
All possible message fields must be defined in the validator.
"""

def __init__(self, default, fields):
self.data = fields
self.validate(fields)
self.update(default, fields)

@classmethod
def from_dict(cls, data, line=None):
c = cls(data)
c.line = line
return c

def validate(self, data):
"""
This methods check if the dict ``data`` follows the validator scheme.
If there is a problem in the validation, raises a MessageError.
"""
validator = self.validator

for field, value in data.items():
if field not in validator:
raise MessageError('Unknown message field: %s' % field)

if value is not None and not isinstance(value, validator[field]):
raise MessageError('%s field must be defined as %s, received: %s' %
(field, validator[field].__name__, type(value).__name__))

def update(self, default, data):
"""
After being validated, this method can merge the ``data`` object with the default values.
If a RequiredField was not provided, raises a MessageError.
"""
default.update(data)
for item, value in default.items():
if isinstance(value, RequiredField):
raise MessageError('Required field: %s' % item)

if not isinstance(value, EmptyField):
setattr(self, item, value)


class RequestMessage(MessageValidator):
validator = {'id': six.string_types, 'url': six.string_types, 'method': six.string_types,
'meta': dict, 'body': six.string_types, 'headers': dict,
'cookies': (dict, list), 'encoding': six.string_types,
'priority': int, 'dont_filter': bool, 'base64': bool}

def __init__(self, fields):
default = {'id': RequiredField(), 'url': RequiredField(), 'method': EmptyField(),
'meta': EmptyField(), 'body': EmptyField(), 'headers': EmptyField(),
'cookies': EmptyField(), 'encoding': EmptyField(), 'priority': EmptyField(),
'dont_filter': EmptyField(), 'base64': False}

super(RequestMessage, self).__init__(default, fields)
self.data.pop('base64', None)


class Form(MessageValidator):
validator = {'formname': six.string_types, 'formxpath': six.string_types,
'formcss': six.string_types, 'formnumber': int,
'formdata': dict, 'clickdata': dict, 'dont_click': bool,
# request fields
'method': six.string_types, 'meta': dict, 'body': six.string_types,
'headers': dict, 'cookies': (dict, list), 'encoding': six.string_types,
'priority': int, 'dont_filter': bool}

def __init__(self, form):
default = {'formname': EmptyField(), 'formxpath': EmptyField(),
'formcss': EmptyField(), 'formnumber': EmptyField(),
'formdata': EmptyField(), 'clickdata': EmptyField(),
'dont_click': EmptyField(),
# request fields
'method': EmptyField(), 'meta': EmptyField(), 'body': EmptyField(),
'headers': EmptyField(), 'cookies': EmptyField(), 'encoding': EmptyField(),
'priority': EmptyField(), 'dont_filter': EmptyField()}

super(Form, self).__init__(default, form)


class FromResponseRequestMessage(RequestMessage):

def __init__(self, fields):
if 'from_response_request' not in fields:
raise MessageError('Required field: from_response_request')
from_response_request = fields.pop('from_response_request')

super(FromResponseRequestMessage, self).__init__(fields)
self.from_response_request = Form.from_dict(from_response_request)


class SpiderMessage(MessageValidator):
validator = {'name': six.string_types, 'start_urls': list,
'allowed_domains': list, 'custom_settings': dict}

def __init__(self, fields):
default = {'name': RequiredField(), 'start_urls': RequiredField(),
'allowed_domains': EmptyField(), 'custom_settings': EmptyField()}

super(SpiderMessage, self).__init__(default, fields)


class LogMessage(MessageValidator):
validator = {'message': six.string_types, 'level': six.string_types}

def __init__(self, fields):
default = {'message': RequiredField(), 'level': RequiredField()}

super(LogMessage, self).__init__(default, fields)
levels = {'CRITICAL': logging.CRITICAL, 'ERROR': logging.ERROR,
'WARNING': logging.WARNING, 'INFO': logging.INFO,
'DEBUG': logging.DEBUG}

if self.level.upper() not in levels:
raise MessageError('Invalid log level: %s' % self.level)

self.level = levels[self.level.upper()]


class CloseMessage(MessageValidator):

def __init__(self, fields):
super(CloseMessage, self).__init__({}, fields)
66 changes: 0 additions & 66 deletions scrapy_streaming/communication/wrappers.py

This file was deleted.

7 changes: 3 additions & 4 deletions scrapy_streaming/external_spiderloader.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import json
import simplejson as json
import os

from twisted.internet import reactor

from scrapy_streaming.process_streaming import ProcessStreamingProtocol
from scrapy_streaming.streaming import StreamingProtocol
from scrapy_streaming.utils import get_project_root


Expand Down Expand Up @@ -34,7 +34,6 @@ def __init__(self, settings, load_spiders=True):

if load_spiders:
path = settings.get('EXTERNAL_SPIDERS_PATH', get_project_root())
# TODO add EXTERNAL_SPIDERS_PATH in docs
path = os.path.abspath(path)
self.external = os.path.join(path, 'external.json')
self._fetch_spiders()
Expand Down Expand Up @@ -68,7 +67,7 @@ def crawl(self, name_or_spider):
if not isinstance(name_or_spider, ExternalSpider):
name_or_spider = self._spiders[name_or_spider]

protocol = ProcessStreamingProtocol()
protocol = StreamingProtocol()
reactor.spawnProcess(protocol, name_or_spider.command, args=[name_or_spider.command] + name_or_spider.args)
reactor.run()

Expand Down
Loading