Package flumotion :: Package component :: Package misc :: Package porter :: Module porter
[hide private]

Source Code for Module flumotion.component.misc.porter.porter

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_porter -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import os 
 19  import random 
 20  import socket 
 21  import string 
 22  import time 
 23  from urllib2 import urlparse 
 24   
 25  from twisted.cred import portal 
 26  from twisted.internet import protocol, reactor, error, defer 
 27  from twisted.spread import pb 
 28  from zope.interface import implements 
 29   
 30  from flumotion.common import medium, log, messages, errors 
 31  from flumotion.common.i18n import N_, gettexter 
 32  from flumotion.component import component 
 33  from flumotion.component.component import moods 
 34  from flumotion.twisted import fdserver, checkers 
 35  from flumotion.twisted import reflect 
 36   
 37  __version__ = "$Rev$" 
 38  T_ = gettexter() 
 39   
 40   
41 -class PorterAvatar(pb.Avatar, log.Loggable):
42 """ 43 An Avatar in the porter representing a streamer 44 """ 45
46 - def __init__(self, avatarId, porter, mind):
47 self.avatarId = avatarId 48 self.porter = porter 49 50 # The underlying transport is now accessible as 51 # self.mind.broker.transport, on which we can call sendFileDescriptor 52 self.mind = mind
53
54 - def isAttached(self):
55 return self.mind != None
56
57 - def logout(self):
58 self.debug("porter client %s logging out", self.avatarId) 59 self.mind = None
60
61 - def perspective_registerPath(self, path):
62 self.log("Perspective called: registering path \"%s\"" % path) 63 self.porter.registerPath(path, self)
64
65 - def perspective_deregisterPath(self, path):
66 self.log("Perspective called: deregistering path \"%s\"" % path) 67 self.porter.deregisterPath(path, self)
68
69 - def perspective_registerPrefix(self, prefix):
70 self.log("Perspective called: registering default") 71 self.porter.registerPrefix(prefix, self)
72
73 - def perspective_deregisterPrefix(self, prefix):
74 self.log("Perspective called: deregistering default") 75 self.porter.deregisterPrefix(prefix, self)
76
77 - def perspective_getPort(self):
78 return self.porter._iptablesPort
79 80
81 -class PorterRealm(log.Loggable):
82 """ 83 A Realm within the Porter that creates Avatars for streamers logging into 84 the porter. 85 """ 86 implements(portal.IRealm) 87
88 - def __init__(self, porter):
89 """ 90 @param porter: The porter that avatars created from here should use. 91 @type porter: L{Porter} 92 """ 93 self.porter = porter
94
95 - def requestAvatar(self, avatarId, mind, *interfaces):
96 self.log("Avatar requested for avatarId %s, mind %r, interfaces %r", 97 avatarId, mind, interfaces) 98 if pb.IPerspective in interfaces: 99 avatar = PorterAvatar(avatarId, self.porter, mind) 100 return pb.IPerspective, avatar, avatar.logout 101 else: 102 raise NotImplementedError("no interface")
103 104
105 -class PorterMedium(component.BaseComponentMedium):
106
107 - def remote_getPorterDetails(self):
108 """ 109 Return the location, login username/password, and listening port 110 and interface for the porter as a tuple (path, username, 111 password, port, interface, external-interface). 112 """ 113 return (self.comp._socketPath, self.comp._username, 114 self.comp._password, self.comp._iptablesPort, 115 self.comp._interface, self.comp._external_interface)
116 117
118 -class Porter(component.BaseComponent, log.Loggable):
119 """ 120 The porter optionally sits in front of a set of streamer components. 121 The porter is what actually deals with incoming connections on a socket. 122 It decides which streamer to direct the connection to, then passes the FD 123 (along with some amount of already-read data) to the appropriate streamer. 124 """ 125 126 componentMediumClass = PorterMedium 127
128 - def init(self):
129 # We maintain a map of path -> avatar (the underlying transport is 130 # accessible from the avatar, we need this for FD-passing) 131 self._mappings = {} 132 self._prefixes = {} 133 134 self._socketlistener = None 135 136 self._socketPath = None 137 self._username = None 138 self._password = None 139 self._port = None 140 self._iptablesPort = None 141 self._porterProtocol = None 142 143 self._interface = '' 144 self._external_interface = ''
145
146 - def registerPath(self, path, avatar):
147 """ 148 Register a path as being served by a streamer represented by this 149 avatar. Will remove any previous registration at this path. 150 151 @param path: The path to register 152 @type path: str 153 @param avatar: The avatar representing the streamer to direct this path 154 to 155 @type avatar: L{PorterAvatar} 156 """ 157 self.debug("Registering porter path \"%s\" to %r" % (path, avatar)) 158 if path in self._mappings: 159 self.warning("Replacing existing mapping for path \"%s\"" % path) 160 161 self._mappings[path] = avatar
162
163 - def deregisterPath(self, path, avatar):
164 """ 165 Attempt to deregister the given path. A deregistration will only be 166 accepted if the mapping is to the avatar passed. 167 168 @param path: The path to deregister 169 @type path: str 170 @param avatar: The avatar representing the streamer being deregistered 171 @type avatar: L{PorterAvatar} 172 """ 173 if path in self._mappings: 174 if self._mappings[path] == avatar: 175 self.debug("Removing porter mapping for \"%s\"" % path) 176 del self._mappings[path] 177 else: 178 self.warning( 179 "Mapping not removed: refers to a different avatar") 180 else: 181 self.warning("Mapping not removed: no mapping found")
182
183 - def registerPrefix(self, prefix, avatar):
184 """ 185 Register a destination for all requests directed to anything beginning 186 with a specified prefix. Where there are multiple matching prefixes, 187 the longest is selected. 188 189 @param avatar: The avatar being registered 190 @type avatar: L{PorterAvatar} 191 """ 192 193 self.debug("Setting prefix \"%s\" for porter", prefix) 194 if prefix in self._prefixes: 195 self.warning("Overwriting prefix") 196 197 self._prefixes[prefix] = avatar
198
199 - def deregisterPrefix(self, prefix, avatar):
200 """ 201 Attempt to deregister a default destination for all requests not 202 directed to a specifically-mapped path. This will only succeed if the 203 default is currently equal to this avatar. 204 205 @param avatar: The avatar being deregistered 206 @type avatar: L{PorterAvatar} 207 """ 208 if prefix not in self._prefixes: 209 self.warning("Mapping not removed: no mapping found") 210 return 211 212 if self._prefixes[prefix] == avatar: 213 self.debug("Removing prefix destination from porter") 214 del self._prefixes[prefix] 215 else: 216 self.warning( 217 "Not removing prefix destination: expected avatar not found")
218
219 - def findPrefixMatch(self, path):
220 found = None 221 # TODO: Horribly inefficient. Replace with pathtree code. 222 for prefix in self._prefixes.keys(): 223 self.log("Checking: %r, %r" % (prefix, path)) 224 if (path.startswith(prefix) and 225 (not found or len(found) < len(prefix))): 226 found = prefix 227 if found: 228 return self._prefixes[found] 229 else: 230 return None
231
232 - def findDestination(self, path):
233 """ 234 Find a destination Avatar for this path. 235 @returns: The Avatar for this mapping, or None. 236 """ 237 238 if path in self._mappings: 239 return self._mappings[path] 240 else: 241 return self.findPrefixMatch(path)
242
243 - def generateSocketPath(self):
244 """ 245 Generate a socket pathname in an appropriate location 246 """ 247 # Also see worker/worker.py:_getSocketPath(), and note that 248 # this suffers from the same potential race. 249 import tempfile 250 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.porter.') 251 os.close(fd) 252 253 return name
254
255 - def generateRandomString(self, numchars):
256 """ 257 Generate a random US-ASCII string of length numchars 258 """ 259 return ''.join(random.choice(string.ascii_letters) 260 for x in range(numchars))
261
262 - def have_properties(self):
263 props = self.config['properties'] 264 265 self.fixRenamedProperties(props, 266 [('socket_path', 'socket-path')]) 267 268 # We can operate in two modes: explicitly configured (neccesary if you 269 # want to handle connections from components in other managers), and 270 # self-configured (which is sufficient for slaving only streamers 271 # within this manager 272 if 'socket-path' in props: 273 # Explicitly configured 274 self._socketPath = props['socket-path'] 275 self._username = props['username'] 276 self._password = props['password'] 277 else: 278 # Self-configuring. Use a randomly create username/password, and 279 # a socket with a random name. 280 self._username = self.generateRandomString(12) 281 self._password = self.generateRandomString(12) 282 self._socketPath = self.generateSocketPath() 283 284 self._requirePassword = props.get('require-password', True) 285 self._socketMode = props.get('socket-mode', 0666) 286 self._port = int(props['port']) 287 self._iptablesPort = int(props.get('iptables-port', self._port)) 288 self._porterProtocol = props.get('protocol', 289 'flumotion.component.misc.porter.porter.HTTPPorterProtocol') 290 self._interface = props.get('interface', '') 291 # if a config has no external-interface set, set it to the same as 292 # interface 293 self._external_interface = props.get('external-interface', 294 self._interface)
295
296 - def do_stop(self):
297 d = None 298 if self._socketlistener: 299 # stopListening() calls (via a callLater) connectionLost(), which 300 # will unlink our socket, so we don't need to explicitly delete it. 301 d = self._socketlistener.stopListening() 302 self._socketlistener = None 303 return d
304
305 - def do_setup(self):
306 # Create our combined PB-server/fd-passing channel 307 self.have_properties() 308 realm = PorterRealm(self) 309 checker = checkers.FlexibleCredentialsChecker() 310 checker.addUser(self._username, self._password) 311 if not self._requirePassword: 312 checker.allowPasswordless(True) 313 314 p = portal.Portal(realm, [checker]) 315 serverfactory = pb.PBServerFactory(p) 316 317 try: 318 # Rather than a normal listenTCP() or listenUNIX(), we use 319 # listenWith so that we can specify our particular Port, which 320 # creates Transports that we know how to pass FDs over. 321 try: 322 os.unlink(self._socketPath) 323 except OSError: 324 pass 325 326 # listenWith is deprecated but the function never did much anyway 327 # 328 # self._socketlistener = reactor.listenWith( 329 # fdserver.FDPort, self._socketPath, 330 # serverfactory, mode=self._socketMode) 331 self._socketlistener = fdserver.FDPort(self._socketPath, 332 serverfactory, reactor=reactor, mode=self._socketMode) 333 self._socketlistener.startListening() 334 335 self.info("Now listening on socketPath %s", self._socketPath) 336 except error.CannotListenError: 337 self.warning("Failed to create socket %s" % self._socketPath) 338 m = messages.Error(T_(N_( 339 "Network error: socket path %s is not available."), 340 self._socketPath)) 341 self.addMessage(m) 342 self.setMood(moods.sad) 343 return defer.fail(errors.ComponentSetupHandledError()) 344 345 # Create the class that deals with the specific protocol we're proxying 346 # in this porter. 347 try: 348 proto = reflect.namedAny(self._porterProtocol) 349 self.debug("Created proto %r" % proto) 350 except (ImportError, AttributeError): 351 self.warning("Failed to import protocol '%s', defaulting to HTTP" % 352 self._porterProtocol) 353 proto = HTTPPorterProtocol 354 355 # And of course we also want to listen for incoming requests in the 356 # appropriate protocol (HTTP, RTSP, etc.) 357 factory = PorterProtocolFactory(self, proto) 358 try: 359 p = fdserver.PassableServerPort(self._port, factory, 360 interface=self._interface, reactor=reactor) 361 p.startListening() 362 self.info("Now listening on interface %r on port %d", 363 self._interface, self._port) 364 except error.CannotListenError: 365 self.warning("Failed to listen on interface %r on port %d", 366 self._interface, self._port) 367 m = messages.Error(T_(N_( 368 "Network error: TCP port %d is not available."), self._port)) 369 self.addMessage(m) 370 self.setMood(moods.sad) 371 return defer.fail(errors.ComponentSetupHandledError())
372 373
374 -class PorterProtocolFactory(protocol.Factory):
375
376 - def __init__(self, porter, protocol):
377 self._porter = porter 378 self.protocol = protocol
379
380 - def buildProtocol(self, addr):
381 p = self.protocol(self._porter) 382 p.factory = self 383 return p
384 385
386 -class PorterProtocol(protocol.Protocol, log.Loggable):
387 """ 388 The base porter is capable of accepting HTTP-like protocols (including 389 RTSP) - it reads the first line of a request, and makes the decision 390 solely on that. 391 392 We can't guarantee that we read precisely a line, so the buffer we 393 accumulate will actually be larger than what we actually parse. 394 395 @cvar MAX_SIZE: the maximum number of bytes allowed for the first line 396 @cvar delimiters: a list of valid line delimiters I check for 397 """ 398 399 logCategory = 'porterprotocol' 400 401 # Don't permit a first line longer than this. 402 MAX_SIZE = 4096 403 404 # Timeout any client connected to the porter for longer than this. A normal 405 # client should only ever be connected for a fraction of a second. 406 PORTER_CLIENT_TIMEOUT = 30 407 408 # In fact, because we check \r, we'll never need to check for \r\n - we 409 # leave this in as \r\n is the more correct form. At the other end, this 410 # gets processed by a full protocol implementation, so being flexible hurts 411 # us not at all 412 delimiters = ['\r\n', '\n', '\r'] 413
414 - def __init__(self, porter):
415 self._buffer = '' 416 self._porter = porter 417 self.requestId = None # a string that should identify the request 418 419 self._timeoutDC = reactor.callLater(self.PORTER_CLIENT_TIMEOUT, 420 self._timeout)
421
422 - def connectionMade(self):
423 424 self.requestId = self.generateRequestId() 425 # PROBE: accepted connection 426 self.debug("[fd %5d] (ts %f) (request-id %r) accepted connection", 427 self.transport.fileno(), time.time(), self.requestId) 428 429 protocol.Protocol.connectionMade(self)
430
431 - def _timeout(self):
432 self._timeoutDC = None 433 self.debug("Timing out porter client after %d seconds", 434 self.PORTER_CLIENT_TIMEOUT) 435 self.transport.loseConnection()
436
437 - def connectionLost(self, reason):
438 if self._timeoutDC: 439 self._timeoutDC.cancel() 440 self._timeoutDC = None
441
442 - def dataReceived(self, data):
443 self._buffer = self._buffer + data 444 self.log("Got data, buffer now \"%s\"" % self._buffer) 445 # We accept more than just '\r\n' (the true HTTP line end) in the 446 # interests of compatibility. 447 for delim in self.delimiters: 448 try: 449 line, remaining = self._buffer.split(delim, 1) 450 break 451 except ValueError: 452 # We didn't find this delimiter; continue with the others. 453 pass 454 else: 455 # Failed to find a valid delimiter. 456 self.log("No valid delimiter found") 457 if len(self._buffer) > self.MAX_SIZE: 458 459 # PROBE: dropping 460 self.debug("[fd %5d] (ts %f) (request-id %r) dropping, " 461 "buffer exceeded", 462 self.transport.fileno(), time.time(), 463 self.requestId) 464 465 return self.transport.loseConnection() 466 else: 467 # No delimiter found; haven't reached the length limit yet. 468 # Wait for more data. 469 return 470 471 # Got a line. self._buffer is still our entire buffer, should be 472 # provided to the slaved process. 473 parsed = self.parseLine(line) 474 if not parsed: 475 self.log("Couldn't parse the first line") 476 return self.transport.loseConnection() 477 478 identifier = self.extractIdentifier(parsed) 479 if not identifier: 480 self.log("Couldn't find identifier in first line") 481 return self.transport.loseConnection() 482 483 if self.requestId: 484 self.log("Injecting request-id %r", self.requestId) 485 parsed = self.injectRequestId(parsed, self.requestId) 486 # Since injecting the token might have modified the parsed 487 # representation of the request, we need to reconstruct the buffer. 488 # Fortunately, we know what delimiter did we split on, what's the 489 # remaining part and that we only split the buffer in two parts 490 self._buffer = delim.join((self.unparseLine(parsed), remaining)) 491 492 # PROBE: request 493 self.debug("[fd %5d] (ts %f) (request-id %r) identifier %s", 494 self.transport.fileno(), time.time(), self.requestId, 495 identifier) 496 497 # Ok, we have an identifier. Is it one we know about, or do we have 498 # a default destination? 499 destinationAvatar = self._porter.findDestination(identifier) 500 501 if not destinationAvatar or not destinationAvatar.isAttached(): 502 if destinationAvatar: 503 self.debug("There was an avatar, but it logged out?") 504 505 # PROBE: no destination; see send fd 506 self.debug( 507 "[fd %5d] (ts %f) (request-id %r) no destination avatar found", 508 self.transport.fileno(), time.time(), self.requestId) 509 510 self.writeNotFoundResponse() 511 return self.transport.loseConnection() 512 513 # Transfer control over this FD. Pass all the data so-far received 514 # along in the same message. The receiver will push that data into 515 # the Twisted Protocol object as if it had been normally received, 516 # so it looks to the receiver like it has read the entire data stream 517 # itself. 518 519 # PROBE: send fd; see no destination and fdserver.py 520 self.debug("[fd %5d] (ts %f) (request-id %r) send fd to avatarId %s", 521 self.transport.fileno(), time.time(), self.requestId, 522 destinationAvatar.avatarId) 523 524 # TODO: Check out blocking characteristics of sendFileDescriptor, fix 525 # if it blocks. 526 try: 527 destinationAvatar.mind.broker.transport.sendFileDescriptor( 528 self.transport.fileno(), self._buffer) 529 except OSError, e: 530 self.warning("[fd %5d] failed to send FD: %s", 531 self.transport.fileno(), log.getExceptionMessage(e)) 532 self.writeServiceUnavailableResponse() 533 return self.transport.loseConnection() 534 535 # PROBE: sent fd; see no destination and fdserver.py 536 self.debug("[fd %5d] (ts %f) (request-id %r) sent fd to avatarId %s", 537 self.transport.fileno(), time.time(), self.requestId, 538 destinationAvatar.avatarId) 539 540 # After this, we don't want to do anything with the FD, other than 541 # close our reference to it - but not close the actual TCP connection. 542 # We set keepSocketAlive to make loseConnection() only call close() 543 # rather than shutdown() then close() 544 self.transport.keepSocketAlive = True 545 self.transport.loseConnection()
546
547 - def parseLine(self, line):
548 """ 549 Parse the initial line of the request. Return an object that can be 550 used to uniquely identify the stream being requested by passing it to 551 extractIdentifier, or None if the request is unreadable. 552 553 Subclasses should override this. 554 """ 555 raise NotImplementedError
556
557 - def unparseLine(self, parsed):
558 """ 559 Recreate the initial request line from the parsed representation. The 560 recreated line does not need to be exactly identical, but both 561 parsedLine(unparseLine(line)) and line should contain the same 562 information (i.e. unparseLine should not lose information). 563 564 UnparseLine has to return a valid line from the porter protocol's 565 scheme point of view (for instance, HTTP). 566 567 Subclasses should override this. 568 """ 569 raise NotImplementedError
570
571 - def extractIdentifier(self, parsed):
572 """ 573 Extract a string that uniquely identifies the requested stream from the 574 parsed representation of the first request line. 575 576 Subclasses should override this, depending on how they implemented 577 parseLine. 578 """ 579 raise NotImplementedError
580
581 - def generateRequestId(self):
582 """ 583 Return a string that will uniquely identify the request. 584 585 Subclasses should override this if they want to use request-ids and 586 also implement injectRequestId. 587 """ 588 raise NotImplementedError
589
590 - def injectRequestId(self, parsed, requestId):
591 """ 592 Take the parsed representation of the first request line and a string 593 token, return a parsed representation of the request line with the 594 request-id possibly mixed into it. 595 596 Subclasses should override this if they generate request-ids. 597 """ 598 # by default, ignore the request-id 599 return parsed
600
601 - def writeNotFoundResponse(self):
602 """ 603 Write a response indicating that the requested resource was not found 604 in this protocol. 605 606 Subclasses should override this to use the correct protocol. 607 """ 608 raise NotImplementedError
609
611 """ 612 Write a response indicating that the requested resource was 613 temporarily uavailable in this protocol. 614 615 Subclasses should override this to use the correct protocol. 616 """ 617 raise NotImplementedError
618 619
620 -class HTTPPorterProtocol(PorterProtocol):
621 scheme = 'http' 622 protos = ["HTTP/1.0", "HTTP/1.1"] 623 requestIdParameter = 'FLUREQID' 624 requestIdBitsNo = 256 625
626 - def parseLine(self, line):
627 try: 628 (method, location, proto) = map(string.strip, line.split(' ', 2)) 629 630 if proto not in self.protos: 631 return None 632 633 # Currently, we just use the URL parsing code from urllib2 634 parsed_url = urlparse.urlparse(location) 635 636 return method, parsed_url, proto 637 638 except ValueError: 639 return None
640
641 - def unparseLine(self, parsed):
642 method, parsed_url, proto = parsed 643 return ' '.join((method, urlparse.urlunparse(parsed_url), proto))
644
645 - def generateRequestId(self):
646 # Remember to return something that does not need quoting to be put in 647 # a GET parameter. This way we spare ourselves the effort of quoting in 648 # injectRequestId. 649 return hex(random.getrandbits(self.requestIdBitsNo))[2:]
650
651 - def injectRequestId(self, parsed, requestId):
652 method, parsed_url, proto = parsed 653 # assuming no need to escape the requestId, see generateRequestId 654 sep = '' 655 if parsed_url[4] != '': 656 sep = '&' 657 query_string = ''.join((parsed_url[4], 658 sep, self.requestIdParameter, '=', 659 requestId)) 660 parsed_url = (parsed_url[:4] + 661 (query_string, ) 662 + parsed_url[5:]) 663 return method, parsed_url, proto
664
665 - def extractIdentifier(self, parsed):
666 method, parsed_url, proto = parsed 667 # Currently, we just return the path part of the URL. 668 return parsed_url[2]
669
670 - def writeNotFoundResponse(self):
671 self.transport.write("HTTP/1.0 404 Not Found\r\n\r\nResource unknown")
672
674 self.transport.write("HTTP/1.0 503 Service Unavailable\r\n\r\n" 675 "Service temporarily unavailable")
676 677
678 -class RTSPPorterProtocol(HTTPPorterProtocol):
679 scheme = 'rtsp' 680 protos = ["RTSP/1.0"] 681
682 - def writeNotFoundResponse(self):
683 self.transport.write("RTSP/1.0 404 Not Found\r\n\r\nResource unknown")
684
686 self.transport.write("RTSP/1.0 503 Service Unavailable\r\n\r\n" 687 "Service temporarily unavailable")
688