Apache/2.4.7 (Ubuntu) Linux sman1baleendah 3.13.0-24-generic #46-Ubuntu SMP Thu Apr 10 19:11:08 UTC 2014 x86_64 uid=33(www-data) gid=33(www-data) groups=33(www-data) safemode : OFF MySQL: ON | Perl: ON | cURL: OFF | WGet: ON > / usr / lib / python2.7 / dist-packages / twisted / internet / test / | server ip : 172.67.156.115 your ip : 172.69.7.205 H O M E |
Filename | /usr/lib/python2.7/dist-packages/twisted/internet/test/test_epollreactor.py |
Size | 7.21 kb |
Permission | rw-r--r-- |
Owner | root : root |
Create time | 27-Apr-2025 09:56 |
Last modified | 04-Jul-2013 01:40 |
Last accessed | 07-Jul-2025 07:23 |
Actions | edit | rename | delete | download (gzip) |
View | text | code | image |
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for L{twisted.internet.epollreactor}.
"""
from __future__ import division, absolute_import
from twisted.trial.unittest import TestCase
try:
from twisted.internet.epollreactor import _ContinuousPolling
except ImportError:
_ContinuousPolling = None
from twisted.internet.task import Clock
from twisted.internet.error import ConnectionDone
class Descriptor(object):
"""
Records reads and writes, as if it were a C{FileDescriptor}.
"""
def __init__(self):
self.events = []
def fileno(self):
return 1
def doRead(self):
self.events.append("read")
def doWrite(self):
self.events.append("write")
def connectionLost(self, reason):
reason.trap(ConnectionDone)
self.events.append("lost")
class ContinuousPollingTests(TestCase):
"""
L{_ContinuousPolling} can be used to read and write from C{FileDescriptor}
objects.
"""
def test_addReader(self):
"""
Adding a reader when there was previously no reader starts up a
C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
self.assertEqual(poller._loop, None)
reader = object()
self.assertFalse(poller.isReading(reader))
poller.addReader(reader)
self.assertNotEqual(poller._loop, None)
self.assertTrue(poller._loop.running)
self.assertIs(poller._loop.clock, poller._reactor)
self.assertTrue(poller.isReading(reader))
def test_addWriter(self):
"""
Adding a writer when there was previously no writer starts up a
C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
self.assertEqual(poller._loop, None)
writer = object()
self.assertFalse(poller.isWriting(writer))
poller.addWriter(writer)
self.assertNotEqual(poller._loop, None)
self.assertTrue(poller._loop.running)
self.assertIs(poller._loop.clock, poller._reactor)
self.assertTrue(poller.isWriting(writer))
def test_removeReader(self):
"""
Removing a reader stops the C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
reader = object()
poller.addReader(reader)
poller.removeReader(reader)
self.assertEqual(poller._loop, None)
self.assertEqual(poller._reactor.getDelayedCalls(), [])
self.assertFalse(poller.isReading(reader))
def test_removeWriter(self):
"""
Removing a writer stops the C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
writer = object()
poller.addWriter(writer)
poller.removeWriter(writer)
self.assertEqual(poller._loop, None)
self.assertEqual(poller._reactor.getDelayedCalls(), [])
self.assertFalse(poller.isWriting(writer))
def test_removeUnknown(self):
"""
Removing unknown readers and writers silently does nothing.
"""
poller = _ContinuousPolling(Clock())
poller.removeWriter(object())
poller.removeReader(object())
def test_multipleReadersAndWriters(self):
"""
Adding multiple readers and writers results in a single
C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
writer = object()
poller.addWriter(writer)
self.assertNotEqual(poller._loop, None)
poller.addWriter(object())
self.assertNotEqual(poller._loop, None)
poller.addReader(object())
self.assertNotEqual(poller._loop, None)
poller.addReader(object())
poller.removeWriter(writer)
self.assertNotEqual(poller._loop, None)
self.assertTrue(poller._loop.running)
self.assertEqual(len(poller._reactor.getDelayedCalls()), 1)
def test_readerPolling(self):
"""
Adding a reader causes its C{doRead} to be called every 1
milliseconds.
"""
reactor = Clock()
poller = _ContinuousPolling(reactor)
desc = Descriptor()
poller.addReader(desc)
self.assertEqual(desc.events, [])
reactor.advance(0.00001)
self.assertEqual(desc.events, ["read"])
reactor.advance(0.00001)
self.assertEqual(desc.events, ["read", "read"])
reactor.advance(0.00001)
self.assertEqual(desc.events, ["read", "read", "read"])
def test_writerPolling(self):
"""
Adding a writer causes its C{doWrite} to be called every 1
milliseconds.
"""
reactor = Clock()
poller = _ContinuousPolling(reactor)
desc = Descriptor()
poller.addWriter(desc)
self.assertEqual(desc.events, [])
reactor.advance(0.001)
self.assertEqual(desc.events, ["write"])
reactor.advance(0.001)
self.assertEqual(desc.events, ["write", "write"])
reactor.advance(0.001)
self.assertEqual(desc.events, ["write", "write", "write"])
def test_connectionLostOnRead(self):
"""
If a C{doRead} returns a value indicating disconnection,
C{connectionLost} is called on it.
"""
reactor = Clock()
poller = _ContinuousPolling(reactor)
desc = Descriptor()
desc.doRead = lambda: ConnectionDone()
poller.addReader(desc)
self.assertEqual(desc.events, [])
reactor.advance(0.001)
self.assertEqual(desc.events, ["lost"])
def test_connectionLostOnWrite(self):
"""
If a C{doWrite} returns a value indicating disconnection,
C{connectionLost} is called on it.
"""
reactor = Clock()
poller = _ContinuousPolling(reactor)
desc = Descriptor()
desc.doWrite = lambda: ConnectionDone()
poller.addWriter(desc)
self.assertEqual(desc.events, [])
reactor.advance(0.001)
self.assertEqual(desc.events, ["lost"])
def test_removeAll(self):
"""
L{_ContinuousPolling.removeAll} removes all descriptors and returns
the readers and writers.
"""
poller = _ContinuousPolling(Clock())
reader = object()
writer = object()
both = object()
poller.addReader(reader)
poller.addReader(both)
poller.addWriter(writer)
poller.addWriter(both)
removed = poller.removeAll()
self.assertEqual(poller.getReaders(), [])
self.assertEqual(poller.getWriters(), [])
self.assertEqual(len(removed), 3)
self.assertEqual(set(removed), set([reader, writer, both]))
def test_getReaders(self):
"""
L{_ContinuousPolling.getReaders} returns a list of the read
descriptors.
"""
poller = _ContinuousPolling(Clock())
reader = object()
poller.addReader(reader)
self.assertIn(reader, poller.getReaders())
def test_getWriters(self):
"""
L{_ContinuousPolling.getWriters} returns a list of the write
descriptors.
"""
poller = _ContinuousPolling(Clock())
writer = object()
poller.addWriter(writer)
self.assertIn(writer, poller.getWriters())
if _ContinuousPolling is None:
skip = "epoll not supported in this environment."
# See LICENSE for details.
"""
Tests for L{twisted.internet.epollreactor}.
"""
from __future__ import division, absolute_import
from twisted.trial.unittest import TestCase
try:
from twisted.internet.epollreactor import _ContinuousPolling
except ImportError:
_ContinuousPolling = None
from twisted.internet.task import Clock
from twisted.internet.error import ConnectionDone
class Descriptor(object):
"""
Records reads and writes, as if it were a C{FileDescriptor}.
"""
def __init__(self):
self.events = []
def fileno(self):
return 1
def doRead(self):
self.events.append("read")
def doWrite(self):
self.events.append("write")
def connectionLost(self, reason):
reason.trap(ConnectionDone)
self.events.append("lost")
class ContinuousPollingTests(TestCase):
"""
L{_ContinuousPolling} can be used to read and write from C{FileDescriptor}
objects.
"""
def test_addReader(self):
"""
Adding a reader when there was previously no reader starts up a
C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
self.assertEqual(poller._loop, None)
reader = object()
self.assertFalse(poller.isReading(reader))
poller.addReader(reader)
self.assertNotEqual(poller._loop, None)
self.assertTrue(poller._loop.running)
self.assertIs(poller._loop.clock, poller._reactor)
self.assertTrue(poller.isReading(reader))
def test_addWriter(self):
"""
Adding a writer when there was previously no writer starts up a
C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
self.assertEqual(poller._loop, None)
writer = object()
self.assertFalse(poller.isWriting(writer))
poller.addWriter(writer)
self.assertNotEqual(poller._loop, None)
self.assertTrue(poller._loop.running)
self.assertIs(poller._loop.clock, poller._reactor)
self.assertTrue(poller.isWriting(writer))
def test_removeReader(self):
"""
Removing a reader stops the C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
reader = object()
poller.addReader(reader)
poller.removeReader(reader)
self.assertEqual(poller._loop, None)
self.assertEqual(poller._reactor.getDelayedCalls(), [])
self.assertFalse(poller.isReading(reader))
def test_removeWriter(self):
"""
Removing a writer stops the C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
writer = object()
poller.addWriter(writer)
poller.removeWriter(writer)
self.assertEqual(poller._loop, None)
self.assertEqual(poller._reactor.getDelayedCalls(), [])
self.assertFalse(poller.isWriting(writer))
def test_removeUnknown(self):
"""
Removing unknown readers and writers silently does nothing.
"""
poller = _ContinuousPolling(Clock())
poller.removeWriter(object())
poller.removeReader(object())
def test_multipleReadersAndWriters(self):
"""
Adding multiple readers and writers results in a single
C{LoopingCall}.
"""
poller = _ContinuousPolling(Clock())
writer = object()
poller.addWriter(writer)
self.assertNotEqual(poller._loop, None)
poller.addWriter(object())
self.assertNotEqual(poller._loop, None)
poller.addReader(object())
self.assertNotEqual(poller._loop, None)
poller.addReader(object())
poller.removeWriter(writer)
self.assertNotEqual(poller._loop, None)
self.assertTrue(poller._loop.running)
self.assertEqual(len(poller._reactor.getDelayedCalls()), 1)
def test_readerPolling(self):
"""
Adding a reader causes its C{doRead} to be called every 1
milliseconds.
"""
reactor = Clock()
poller = _ContinuousPolling(reactor)
desc = Descriptor()
poller.addReader(desc)
self.assertEqual(desc.events, [])
reactor.advance(0.00001)
self.assertEqual(desc.events, ["read"])
reactor.advance(0.00001)
self.assertEqual(desc.events, ["read", "read"])
reactor.advance(0.00001)
self.assertEqual(desc.events, ["read", "read", "read"])
def test_writerPolling(self):
"""
Adding a writer causes its C{doWrite} to be called every 1
milliseconds.
"""
reactor = Clock()
poller = _ContinuousPolling(reactor)
desc = Descriptor()
poller.addWriter(desc)
self.assertEqual(desc.events, [])
reactor.advance(0.001)
self.assertEqual(desc.events, ["write"])
reactor.advance(0.001)
self.assertEqual(desc.events, ["write", "write"])
reactor.advance(0.001)
self.assertEqual(desc.events, ["write", "write", "write"])
def test_connectionLostOnRead(self):
"""
If a C{doRead} returns a value indicating disconnection,
C{connectionLost} is called on it.
"""
reactor = Clock()
poller = _ContinuousPolling(reactor)
desc = Descriptor()
desc.doRead = lambda: ConnectionDone()
poller.addReader(desc)
self.assertEqual(desc.events, [])
reactor.advance(0.001)
self.assertEqual(desc.events, ["lost"])
def test_connectionLostOnWrite(self):
"""
If a C{doWrite} returns a value indicating disconnection,
C{connectionLost} is called on it.
"""
reactor = Clock()
poller = _ContinuousPolling(reactor)
desc = Descriptor()
desc.doWrite = lambda: ConnectionDone()
poller.addWriter(desc)
self.assertEqual(desc.events, [])
reactor.advance(0.001)
self.assertEqual(desc.events, ["lost"])
def test_removeAll(self):
"""
L{_ContinuousPolling.removeAll} removes all descriptors and returns
the readers and writers.
"""
poller = _ContinuousPolling(Clock())
reader = object()
writer = object()
both = object()
poller.addReader(reader)
poller.addReader(both)
poller.addWriter(writer)
poller.addWriter(both)
removed = poller.removeAll()
self.assertEqual(poller.getReaders(), [])
self.assertEqual(poller.getWriters(), [])
self.assertEqual(len(removed), 3)
self.assertEqual(set(removed), set([reader, writer, both]))
def test_getReaders(self):
"""
L{_ContinuousPolling.getReaders} returns a list of the read
descriptors.
"""
poller = _ContinuousPolling(Clock())
reader = object()
poller.addReader(reader)
self.assertIn(reader, poller.getReaders())
def test_getWriters(self):
"""
L{_ContinuousPolling.getWriters} returns a list of the write
descriptors.
"""
poller = _ContinuousPolling(Clock())
writer = object()
poller.addWriter(writer)
self.assertIn(writer, poller.getWriters())
if _ContinuousPolling is None:
skip = "epoll not supported in this environment."