Exploring the datafeed API in CubicWeb
The datafeed API is one of the nice features of the CubicWeb framework. It makes it possible to easily build such things as a news aggregator (or even a semantic news feed reader), a LDAP importer or an application importing data from another web platform. The underlying API is quite flexible and powerful. Yet, the documentation being quite thin, it may be hard to find one's way through. In this article, we'll describe the basics of the datafeed API and provide guiding examples.
The datafeed API is essentially built around two things: a CWSource entity and a parser, which is a kind of AppObject.
The CWSource entity defines a list of URL from which to fetch data to be imported in the current CubicWeb instance, it is linked to a parser through its regid. So something like the following should be enough to create a usable datafeed source .
create_entity('CWSource', name=u'some name', type=u'datafeed', parser=u'myparser')
The parser is usually a subclass of DataFeedParser (from cubicweb.server.sources.datafeed). It should at least implement the two methods process and before_entity_copy. To make it easier, there are specialized parsers such as DataFeedXMLParser that already define process so that subclasses only have to implement the process_item method.
Overview of the datafeed API
Before going into further details about the actual implementation of a DataFeedParser, it's worth having in mind a few details about the datafeed parsing and import process. This involves various players from the CubicWeb server, namely: a DataFeedSource (from cubicweb.server.sources.datafeed), the Repository and the DataFeedParser.
- Everything starts from the Repository which loops over its sources and pulls data from each of these (this is done using a looping task which is setup upon repository startup). In the case of datafeed sources, Repository sources are instances of the aforementioned DataFeedSource class .
- The DataFeedSource selects the appropriate parser from the registry and loops on each uri defined in the respective CWSource entity by calling the parser's process method with that uri as argument (methods pull_data and process_urls of DataFeedSource).
- If the result of the parsing step is successful, the DataFeedSource will call the parser's handle_deletion method, with the URI of the previously imported entities.
- Then, the import log is formatted and the transaction committed. The DataFeedSource and DataFeedParser are connected to an import_log which feeds the CubicWeb instance with a CWDataImport per data pull. This usually contains the number of created and updated entities along with any error/warning message logged by the parser. All this is visible in a table from the CWSource primary view.
So now, you might wonder what actually happens during the parser's process method call. This method takes an URL from which to fetch data and processes further each piece of data (using a process_item method for instance). For each data-item:
the repository is queried to retrieve or create an entity in the system source: this is done using the extid2entity method;
this extid2entity method essentially needs two pieces of information:
- a so-called extid, which uniquely identifies an item in the distant source
- any other information needed to create or update the corresponding entity in the system source (this will be later refered to as the sourceparams)
then, given the (new or existing) entity returned by extid2entity, the parser can perform further postprocessing (for instance, updating any relation on this entity).
In step 1 above, the parser method extid2entity in turns calls the repository method extid2eid given the current source and the extid value. If an entry in the entities table matches with the specified extid, the corresponding eid (identifier in the system source) is returned. Otherwise, a new eid is created. It's worth noting that the created entity (in case the entity is to be created) is not complete with respect to the data model at this point. In order the entity to be completed, the source method before_entity_insertion is called. This is where the aforementioned sourceparams are used. More specifically, on the parser side the before_entity_copy method is called: it usually just updates (using entity.cw_set() for instance) the fetched entity with any relevant information.
Case study: a news feeds parser
Now we'll go through a concrete example to illustrate all those fairly abstract concepts and implement a datafeed parser which can be used to import news feeds. Our parser will create entities of type FeedArticle, which minimal data model would be:
class FeedArticle(EntityType): title = String(fulltextindexed=True) uri = String(unique=True) author = String(fulltextindexed=True) content = RichString(fulltextindexed=True, default_format='text/html')
Here we'll reuse the DataFeedXMLParser, not because we have XML data to parse, but because its interface fits well with our purpose, namely: it ships an item-based processing (a process_item method) and it relies on a parse method to fetch raw data. The underlying parsing of the news feed resources will be handled by feedparser.
class FeedParser(DataFeedXMLParser): regid = 'newsaggregator.feed-parser'
The parse method is called by process, it should return a list tuples with items information.
def parse(self, url): """Delegate to feedparser to retrieve feed items""" data = feedparser.parse(url) return zip(data.entries)
Then the process_item method takes an individual item (i.e. an entry of the result obtained from feedparser in our case). It essentially defines an extid, here the uri of the feed entry (good candidate for unicity) and calls extid2entity with that extid, the entity type to be created / retrieved and any additional data useful for entity completion passed as keyword arguments. (The process_feed method call just transforms the results obtained from feedparser into a dict suitable for entity creation following the data model described above.)
def process_item(self, entry): data = self.process_feed(entry) extid = data['uri'] entity = self.extid2entity(extid, 'FeedArticle', feeddata=data)
The before_entity_copy method is called before the entity is actually created (or updated) in order to give the parser a chance to complete it with any other attribute that could be set from source data (namely feedparser data in our case).
def before_entity_copy(self, entity, sourceparams): feeddata = sourceparams['feeddata'] entity.cw_edited.update(feeddata)
And this is all what's essentially needed for a simple parser. Further details could be found in the news aggregator cube. More sophisticated parsers may use other concepts not described here, such as source mappings.
Testing datafeed parsers
Testing a datafeed parser often involves pulling data from the corresponding datafeed source. Here is a minimal test snippet that illustrates how to retrieve the datafeed source from a CWSource entity and to pull data from it.
with self.admin_access.repo_cnx() as cnx: # Assuming one knows the URI of a CWSource. rset = cnx.execute('CWSource X WHERE X uri %s' % uri) # Retrieve the datafeed source instance. dfsource = self.repo.sources_by_eid[rset] # Make sure it's parser matches the expected. self.assertEqual(dfsource.parser_id, '<my-parser-id>') # Pull data using an internal connection. with self.repo.internal_cnx() as icnx: stats = dfsource.pull_data(icnx, force=True, raise_on_error=True) icnx.commit()
The resulting stats is a dictionnary containing eids of created and updated entities during the pull. In addition all entities created should have the cw_source relation set to the corresponding CWSource entity.
It is possible to add some configuration to the CWSource entity in the form a string of configuration items (one per line). Noteworthy items are:
The mapping between CWSource entities' type (e.g. "datafeed") and DataFeedSource object is quite unusual as it does not rely on the vreg but uses a specific sources registry (defined in cubicweb.server.SOURCE_TYPES).