Thanks to Martijn Pieters answer I stopped working around python-requests behavior and looked for a completely different approach.
I ended up using pyCurl. You can use it similar to a select+recv loop without inverting the control flow and giving up control to a dedicated IO-loop as in Tornado, etc. This way it is easy to use a generator that yields new lines as soon as they arrive - without further buffering in intermediate layers that could introduce delay or additional threads that run the IO-loop.
At the same time, it is high-level enough, that you don't need to bother about chunked transfer encoding, SSL encryption or gzip compression.
This was my old code, where chunk_size
=1 resulted in 45% CPU load and chunk_size
>1 introduced additional lag.
import requests
class RequestsHTTPStream(object):
def __init__(self, url):
self.url = url
def iter_lines(self):
headers = {'Cache-Control':'no-cache',
'Accept': 'text/event-stream',
'Accept-Encoding': 'gzip'}
response = requests.get(self.url, stream=True, headers=headers)
return response.iter_lines(chunk_size=1)
Here is my new code based on pyCurl:
(Unfortunately the curl_easy_* style perform
blocks completely, which makes it difficult to yield lines in between without using threads. Thus I'm using the curl_multi_* methods)
import pycurl
import urllib2
import httplib
import StringIO
class CurlHTTPStream(object):
def __init__(self, url):
self.url = url
self.received_buffer = StringIO.StringIO()
self.curl = pycurl.Curl()
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.HTTPHEADER, ['Cache-Control: no-cache', 'Accept: text/event-stream'])
self.curl.setopt(pycurl.ENCODING, 'gzip')
self.curl.setopt(pycurl.CONNECTTIMEOUT, 5)
self.curl.setopt(pycurl.WRITEFUNCTION, self.received_buffer.write)
self.curlmulti = pycurl.CurlMulti()
self.curlmulti.add_handle(self.curl)
self.status_code = 0
SELECT_TIMEOUT = 10
def _any_data_received(self):
return self.received_buffer.tell() != 0
def _get_received_data(self):
result = self.received_buffer.getvalue()
self.received_buffer.truncate(0)
self.received_buffer.seek(0)
return result
def _check_status_code(self):
if self.status_code == 0:
self.status_code = self.curl.getinfo(pycurl.HTTP_CODE)
if self.status_code != 0 and self.status_code != httplib.OK:
raise urllib2.HTTPError(self.url, self.status_code, None, None, None)
def _perform_on_curl(self):
while True:
ret, num_handles = self.curlmulti.perform()
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
return num_handles
def _iter_chunks(self):
while True:
remaining = self._perform_on_curl()
if self._any_data_received():
self._check_status_code()
yield self._get_received_data()
if remaining == 0:
break
self.curlmulti.select(self.SELECT_TIMEOUT)
self._check_status_code()
self._check_curl_errors()
def _check_curl_errors(self):
for f in self.curlmulti.info_read()[2]:
raise pycurl.error(*f[1:])
def iter_lines(self):
chunks = self._iter_chunks()
return self._split_lines_from_chunks(chunks)
@staticmethod
def _split_lines_from_chunks(chunks):
#same behaviour as requests' Response.iter_lines(...)
pending = None
for chunk in chunks:
if pending is not None:
chunk = pending + chunk
lines = chunk.splitlines()
if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]:
pending = lines.pop()
else:
pending = None
for line in lines:
yield line
if pending is not None:
yield pending
This code tries to fetch as many bytes as possible from the incoming stream, without blocking unnecessarily if there are only a few. In comparison, the CPU load is around 0.2%