Package amplee :: Package atompub :: Package member :: Module helper
[hide private]
[frames] | no frames]

Source Code for Module amplee.atompub.member.helper

  1  # -*- coding: utf-8 -*- 
  2  __docformat__ = 'epytext en' 
  3   
  4  __doc__ = """ 
  5  Member classes have a common set of functionnalities that this module try 
  6  to centralize to make the code easier to read and maintain 
  7  """ 
  8   
  9  import copy 
 10  from urllib import quote 
 11   
 12  import amara 
 13  from amplee.utils import ATOM10_PREFIX, ATOMPUB_PREFIX, XML_PREFIX, XML_NS, \ 
 14       ATOM10_NS, ATOMPUB_NS, XHTML1_NS, XHTML1_PREFIX, THR_NS 
 15   
 16  from amplee.utils import generate_uuid_uri, get_isodate, safe_quote, \ 
 17       safe_url_join, qname 
 18   
19 -class MemberHelper(object):
20 - def __init__(self, collection, existing_member=None):
21 self.collection = collection 22 self.existing_member = existing_member 23 self.entry = None
24
25 - def initiate(self, id):
26 """ 27 Creates a default Atom entry document with its id, published, updated and 28 edited children set. If the collection has a C{xml_base} attribute set it will 29 also set it. 30 31 @type id: unicode 32 @param id: identifier of the entry 33 """ 34 doc = amara.create_document(qname(u'entry', ATOM10_PREFIX), ns=ATOM10_NS, 35 prefixes={ATOM10_PREFIX: ATOM10_NS, 36 ATOMPUB_PREFIX: ATOMPUB_NS}) 37 entry = doc.entry 38 entry.xml_append(doc.xml_create_element(qname(u"id", ATOM10_PREFIX), 39 ns=ATOM10_NS, content=id)) 40 41 isodate = get_isodate() 42 43 if self.existing_member is None: 44 entry.xml_append(doc.xml_create_element(qname(u"published", ATOM10_PREFIX), 45 ns=ATOM10_NS, content=isodate)) 46 entry.xml_append(doc.xml_create_element(qname(u"updated", ATOM10_PREFIX), 47 ns=ATOM10_NS, content=isodate)) 48 else: 49 # when we already have an existing member we make sure 50 # we preserve the atom:published and atom:updated elements 51 pub = self.existing_member.atom.xml_child_elements.get('published') 52 if pub: 53 pub_date = unicode(pub) 54 else: 55 pub_date = isodate 56 entry.xml_append(doc.xml_create_element(qname(u"published", ATOM10_PREFIX), 57 ns=ATOM10_NS, content=pub_date)) 58 59 updated = self.existing_member.atom.xml_child_elements.get('updated') 60 if pub: 61 updated_date = unicode(updated) 62 else: 63 updated_date = isodate 64 entry.xml_append(doc.xml_create_element(qname(u"updated", ATOM10_PREFIX), 65 ns=ATOM10_NS, content=updated_date)) 66 67 entry.xml_append(doc.xml_create_element(qname(u"edited", ATOMPUB_PREFIX), 68 ns=ATOMPUB_NS, content=isodate)) 69 70 if self.collection.xml_base: 71 entry.xml_set_attribute((qname(u'base', XML_PREFIX), XML_NS), 72 self.collection.xml_base) 73 self.entry = doc
74 90 115
116 - def add_remote_content(self, media_id, media_type):
117 """ 118 Adds an C{atom:content} element with C{src} and C{type} attributes. 119 The C{src} value will be computed based on internal the C{collection.base_uri} 120 attribute and the C{media_id}. 121 122 @type media_id: unicode 123 @param media_id: value to be used for the last segment of the IRI 124 125 @type media_type: unicode 126 @param media_type: the mime type of the remote resource 127 """ 128 src_uri = self.collection.get_base_uri() 129 attr = {u'src': safe_url_join([src_uri, media_id]), 130 u'type': media_type} 131 self.add_element('content', attributes=attr)
132
133 - def add_element(self, name, content=None, attributes=None, 134 prefix=None, ns=None, parent=None):
135 """ 136 Adds a child to an element 137 138 @type name: unicode 139 @param name: name of the element 140 141 @type content: unicode 142 @param content: value of the element 143 144 @type attributes: dict 145 @param attributes: dictionnary of attributes to set (keys and values must be unicode) 146 147 @type prefix: unicode 148 @param prefix: XML prefix (if not provided defaults to the one from C{self.entry} 149 150 @type ns: unicode 151 @param ns: XML namespace (if not provided defaults to the one from C{self.entry}) 152 153 @type parent: L{amara.bindery.root_base} or subclass 154 @param parent: if provided the child will be attached to parent, 155 otherwise C{parent} will default to C{self.entry} 156 157 @rtype: L{amara.bindery.root_base} subclass 158 @return: the newly created and added child 159 """ 160 if not parent: 161 parent = self.entry.entry 162 if not prefix: 163 prefix = parent.prefix 164 if not ns: 165 ns = parent.namespaceURI 166 167 d = parent.ownerDocument 168 e = d.xml_create_element(qname(name, prefix), ns=ns, content=content, 169 attributes=attributes) 170 parent.xml_append(e) 171 172 return e
173
174 - def copy_element(self, name, source=None, destination=None, ns=None):
175 """ 176 Copies an element into another element 177 178 @type name: unicode 179 @param name: name of the element to copy 180 181 @type source: L{amara.bindery.root_base} subclass 182 @param source: element containing a child C{name} 183 184 @type destination: unicode 185 @param destination: element to which attached the copy 186 187 @type ns: unicode 188 @param ns: XML namespace to match (if not provided defaults from C{self.entry}) 189 190 @rtype: L{amara.bindery.root_base} subclass 191 @return: the newly copied element 192 """ 193 if not destination: 194 destination = self.entry.entry 195 if not ns: 196 ns = destination.namespaceURI 197 198 if name in source.xml_child_elements: 199 clone = copy.deepcopy(source.xml_child_elements[name]) 200 destination.xml_append(clone.ownerDocument.childNodes[0]) 201 return clone
202
203 - def copy_elements(self, name, source=None, destination=None, ns=None):
204 """ 205 Copies a list of elements into another element 206 207 @type name: unicode 208 @param name: name of the elements to copy 209 210 @type source: L{amara.bindery.root_base} subclass 211 @param source: element containing a child C{name} 212 213 @type destination: unicode 214 @param destination: element to which attached the copy 215 216 @type ns: unicode 217 @param ns: XML namespace to match (if not provided defaults from C{self.entry}) 218 """ 219 if not destination: 220 destination = self.entry.entry 221 if not ns: 222 ns = destination.xml_ns 223 224 children = getattr(source, name, []) 225 for child in children: 226 clone = copy.deepcopy(child) 227 destination.xml_children.append(clone.ownerDocument.childNodes[0])
228
229 - def is_draft(self, source):
230 """ 231 Checks to see if source is a draft or not. 232 233 @type source: L{amara.bindery.root_base} 234 @param source: amara instance against which to test the following XPath 'app:control/app:draft' 235 236 @rtype: bool 237 @return: C{True} it's a draft, C{False} otherwise 238 """ 239 draft_element = source.xml_xpath('/atom:entry/app:control/app:draft') 240 if draft_element and unicode(draft_element[0]) == u'yes': 241 return True 242 243 return False
244
245 - def requires_summary(self):
246 """ 247 Returns C{True} if the entry requires an atom:summary 248 to be added based on section 4.1.2 of RFC 4287. 249 250 @rtype: bool 251 @return: returns C{True} if the entry requires an atom:summary 252 """ 253 # atom:entry elements MUST contain an atom:summary element in either of the following cases: 254 # * the atom:entry contains an atom:content that has a "src" attribute (and is thus empty). 255 # * the atom:entry contains content that is encoded in Base64; i.e., 256 # the "type" attribute of atom:content is a MIME media type [MIMEREG], 257 # but is not an XML media type [RFC3023], does not begin with "text/", 258 # and does not end with "/xml" or "+xml". 259 needs_summary = False 260 entry = self.entry.entry 261 content = entry.xml_child_elements.get('content') 262 if content: 263 src = content.attributes.get('src') 264 mime_type = content.attributes.get('type') 265 if src: 266 needs_summary = True 267 elif mime_type: 268 mime_type = mime_type.xml_text 269 if mime_type not in _xml_media_types: 270 needs_summary = True 271 if mime_type.startswith("text/"): 272 needs_summary = False 273 if mime_type.endswith("/xml") or mime_type.endswith("+xml"): 274 needs_summary = False 275 else: 276 needs_summary = True 277 return needs_summary
278
279 - def requires_author(self):
280 """ 281 Returns C{True} if the entry requires an atom:author 282 to be added based on section 4.1.2 of RFC 4287. 283 284 @rtype: bool 285 @return: returns C{True} if the entry requires an atom:author 286 """ 287 # atom:entry elements MUST contain one or more atom:author elements, 288 # unless the atom:entry contains an atom:source element 289 # that contains an atom:author element or, in an Atom Feed Document, 290 # the atom:feed element contains an atom:author element itself. 291 needs_author = False 292 entry = self.entry.entry 293 author = entry.xml_child_elements.get('author') 294 if not author: 295 needs_author = True 296 ## source = entry.xml_child_elements.get('source') 297 ## if source: 298 ## author = source.xml_child_elements.get('author') 299 ## if author: 300 ## needs_author = False 301 ## if entry.parentNode and entry.parentNode.localName == 'feed' and \ 302 ## entry.parentNode.prefix == self.entry.prefix and \ 303 ## entry.parentNode.namespaceURI == ATOM10_NS: 304 ## author = self.entry.parentNode.xml_child_elements.get('author') 305 ## if author: 306 ## needs_author = False 307 return needs_author
308
309 - def validate(self):
310 """ 311 Validates this entry by following section 4.1.2 of RFC 4287 312 """ 313 entry = self.entry.entry 314 315 if 'title' not in entry.xml_child_elements: 316 self.add_element('title', content=u'', attributes={u'type': u'text'}) 317 318 needs_summary = self.requires_summary() 319 if needs_summary: 320 self.add_element(u'summary', content=u'', attributes={u'type': u'text'}) 321 322 needs_author = self.requires_author() 323 if needs_author: 324 author = self.add_element('author') 325 self.add_element(u'name', content=u'', parent=author) 326 327 if 'content' not in entry.xml_child_elements: 328 # atom:entry elements that contain no child atom:content element 329 # MUST contain at least one atom:link element with a rel attribute value of "alternate". 330 links = entry.xml_xpath('atom:link[@rel="alternate"]') 331 if not links: 332 self.add_element(u'content', content=u'', attributes={u'type': u'text'})
333
334 - def remove_duplicates(self, name, ns, prefix):
335 """ 336 Removes duplicate elements in the entry and keeps only the first one found. 337 338 @type name: unicode 339 @param: name of the element to lookup and remove 340 341 @type ns: unicode 342 @param ns: namespace of the element to lookup and remove 343 344 @type prefix: unicode 345 @param prefix: prefix of the element to lookup and remove 346 """ 347 children = self.entry.entry.xml_xpath("%s:%s" % (prefix, name)) 348 if len(children) > 1: 349 for child in children: 350 self.entry.entry.xml_remove_child(child)
351
352 - def copy_from(self, entry, include_content=True):
353 """ 354 Copies elements from the C{entry} into C{self.entry} and ensures 355 this process keeps C{self.entry} valid as per RFC 4287. 356 357 @type entry: L{amara.bindery.root_base} 358 @param entry: source entry 359 360 @type include_content: bool 361 @param include_content: indicates whether or not we are interested 362 in copying the content as well 363 """ 364 # the source document had no title to copy from 365 # we must add one 366 title = self.copy_element('title', source=entry) 367 if not title: 368 self.add_element(u'title', content=u'', attributes={u'type': u'text'}) 369 370 summary = self.copy_element('summary', source=entry) 371 # the source document had no summary to copy from 372 # let's see if we need one 373 if not summary: 374 needs_summary = self.requires_summary() 375 if needs_summary: 376 self.add_element(u'summary', content=u'', attributes={u'type': u'text'}) 377 378 authors = self.copy_elements('author', source=entry) 379 # the source document had no authors to copy from 380 # let's see if we need one 381 if not authors: 382 needs_author = self.requires_author() 383 if needs_author: 384 author = self.add_element('author') 385 self.add_element(u'name', content=u'', parent=author) 386 387 # let's copy the atom:link elements from the source 388 # except the rel="edit" and rem="edit-media" ones 389 links = entry.xml_xpath('atom:link[@rel="edit"]') 390 for link in links: 391 self.entry.xml_remove_child(link) 392 links = entry.xml_xpath('atom:link[@rel="edit-media"]') 393 for link in links: 394 self.entry.xml_remove_child(link) 395 self.copy_elements('link', source=entry) 396 self.copy_element('in-reply-to', source=entry, ns=THR_NS) 397 self.copy_elements('category', source=entry) 398 399 content = None 400 if include_content: 401 content = self.copy_element('content', source=entry) 402 403 if not content: 404 # atom:entry elements that contain no child atom:content element 405 # MUST contain at least one atom:link element with a rel attribute value of "alternate". 406 links = entry.xml_xpath('atom:link[@rel="alternate"]') 407 if not links: 408 self.add_element(u'content', content=u'', attributes={u'type': u'text'})
409
410 - def clone(self, entry, preserve_id=False, preserve_dates=False, include_content=True):
411 """ 412 Makes a clone of the provided C{entry} but keeps from C{self.entry} some 413 element values based on the parameters of the function call. 414 415 At the end of the function, C{self.entry} is set to the clone of C{entry}. 416 417 @type entry: L{amara.bindery.root_base} 418 @param entry: entry that should be cloned and set to C{self.entry} 419 420 @type preserve_id: bool 421 @param preserve_id: indicates if C{self.entry.id} must be kept. 422 423 @type preserve_dates: bool 424 @param preserve_dates: if C{False} then C{self.entry.published}, 425 C{self.entry.updated} and C{self.entry.edited} are kept from C{self.entry} 426 Otherwise, use the ones provided in the clone. 427 """ 428 cloned_entry = copy.deepcopy(entry) 429 root = cloned_entry.entry 430 entry = self.entry.entry 431 432 if not preserve_id: 433 if hasattr(root, 'id'): 434 root.id = unicode(entry.id) 435 else: 436 self.add_element(name=u'id', content=unicode(entry.id), 437 parent=root)