| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008 |
- #!/usr/bin/env python
- #
- # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
- # SPDX-License-Identifier: Apache-2.0
- # Utility for testing the web server. Test cases:
- # Assume the device supports 'n' simultaneous open sockets
- #
- # HTTP Server Tests
- #
- # 0. Firmware Settings:
- # - Create a dormant thread whose sole job is to call httpd_stop() when instructed
- # - Measure the following before httpd_start() is called:
- # - current free memory
- # - current free sockets
- # - Measure the same whenever httpd_stop is called
- # - Register maximum possible URI handlers: should be successful
- # - Register one more URI handler: should fail
- # - Deregister on URI handler: should be successful
- # - Register on more URI handler: should succeed
- # - Register separate handlers for /hello, /hello/type_html. Also
- # ensure that /hello/type_html is registered BEFORE /hello. (tests
- # that largest matching URI is picked properly)
- # - Create URI handler /adder. Make sure it uses a custom free_ctx
- # structure to free it up
- # 1. Using Standard Python HTTP Client
- # - simple GET on /hello (returns Hello World. Ensures that basic
- # firmware tests are complete, or returns error)
- # - POST on /hello (should fail)
- # - PUT on /hello (should fail)
- # - simple POST on /echo (returns whatever the POST data)
- # - simple PUT on /echo (returns whatever the PUT data)
- # - GET on /echo (should fail)
- # - simple GET on /hello/type_html (returns Content type as text/html)
- # - simple GET on /hello/status_500 (returns HTTP status 500)
- # - simple GET on /false_uri (returns HTTP status 404)
- # - largest matching URI handler is picked is already verified because
- # of /hello and /hello/type_html tests
- #
- #
- # 2. Session Tests
- # - Sessions + Pipelining basics:
- # - Create max supported sessions
- # - On session i,
- # - send 3 back-to-back POST requests with data i on /adder
- # - read back 3 responses. They should be i, 2i and 3i
- # - Tests that
- # - pipelining works
- # - per-session context is maintained for all supported
- # sessions
- # - Close all sessions
- #
- # - Cleanup leftover data: Tests that the web server properly cleans
- # up leftover data
- # - Create a session
- # - POST on /leftover_data with 52 bytes of data (data includes
- # \r\n)(the handler only
- # reads first 10 bytes and returns them, leaving the rest of the
- # bytes unread)
- # - GET on /hello (should return 'Hello World')
- # - POST on /false_uri with 52 bytes of data (data includes \r\n)
- # (should return HTTP 404)
- # - GET on /hello (should return 'Hello World')
- #
- # - Test HTTPd Asynchronous response
- # - Create a session
- # - GET on /async_data
- # - returns 'Hello World!' as a response
- # - the handler schedules an async response, which generates a second
- # response 'Hello Double World!'
- #
- # - Spillover test
- # - Create max supported sessions with the web server
- # - GET /hello on all the sessions (should return Hello World)
- # - Create one more session, this should fail
- # - GET /hello on all the sessions (should return Hello World)
- #
- # - Timeout test
- # - Create a session and only Send 'GE' on the same (simulates a
- # client that left the network halfway through a request)
- # - Wait for recv-wait-timeout
- # - Server should automatically close the socket
- # ############ TODO TESTS #############
- # 3. Stress Tests
- #
- # - httperf
- # - Run the following httperf command:
- # httperf --server=10.31.130.126 --wsess=8,50,0.5 --rate 8 --burst-length 2
- #
- # - The above implies that the test suite will open
- # - 8 simultaneous connections with the server
- # - the rate of opening the sessions will be 8 per sec. So in our
- # case, a new connection will be opened every 0.2 seconds for 1 second
- # - The burst length 2 indicates that 2 requests will be sent
- # simultaneously on the same connection in a single go
- # - 0.5 seconds is the time between sending out 2 bursts
- # - 50 is the total number of requests that will be sent out
- #
- # - So in the above example, the test suite will open 8
- # connections, each separated by 0.2 seconds. On each connection
- # it will send 2 requests in a single burst. The bursts on a
- # single connection will be separated by 0.5 seconds. A total of
- # 25 bursts (25 x 2 = 50) will be sent out
- # 4. Leak Tests
- # - Simple Leak test
- # - Simple GET on /hello/restart (returns success, stop web server, measures leaks, restarts webserver)
- # - Simple GET on /hello/restart_results (returns the leak results)
- # - Leak test with open sockets
- # - Open 8 sessions
- # - Simple GET on /hello/restart (returns success, stop web server,
- # measures leaks, restarts webserver)
- # - All sockets should get closed
- # - Simple GET on /hello/restart_results (returns the leak results)
- import argparse
- import http.client
- import logging
- import random
- import socket
- import string
- import sys
- import threading
- import time
- _verbose_ = False
- class Session(object):
- def __init__(self, addr, port, timeout=15):
- self.client = socket.create_connection((addr, int(port)), timeout=timeout)
- self.target = addr
- self.status = 0
- self.encoding = ''
- self.content_type = ''
- self.content_len = 0
- def send_err_check(self, request, data=None):
- rval = True
- try:
- self.client.sendall(request.encode())
- if data:
- self.client.sendall(data.encode())
- except socket.error as err:
- self.client.close()
- logging.info('Socket Error in send :{}'.format(err))
- rval = False
- return rval
- def send_get(self, path, headers=None):
- request = 'GET ' + path + ' HTTP/1.1\r\nHost: ' + self.target
- if headers:
- for field, value in headers.items():
- request += '\r\n' + field + ': ' + value
- request += '\r\n\r\n'
- return self.send_err_check(request)
- def send_put(self, path, data, headers=None):
- request = 'PUT ' + path + ' HTTP/1.1\r\nHost: ' + self.target
- if headers:
- for field, value in headers.items():
- request += '\r\n' + field + ': ' + value
- request += '\r\nContent-Length: ' + str(len(data)) + '\r\n\r\n'
- return self.send_err_check(request, data)
- def send_post(self, path, data, headers=None):
- request = 'POST ' + path + ' HTTP/1.1\r\nHost: ' + self.target
- if headers:
- for field, value in headers.items():
- request += '\r\n' + field + ': ' + value
- request += '\r\nContent-Length: ' + str(len(data)) + '\r\n\r\n'
- return self.send_err_check(request, data)
- def read_resp_hdrs(self):
- try:
- state = 'nothing'
- resp_read = ''
- while True:
- char = self.client.recv(1).decode()
- if char == '\r' and state == 'nothing':
- state = 'first_cr'
- elif char == '\n' and state == 'first_cr':
- state = 'first_lf'
- elif char == '\r' and state == 'first_lf':
- state = 'second_cr'
- elif char == '\n' and state == 'second_cr':
- state = 'second_lf'
- else:
- state = 'nothing'
- resp_read += char
- if state == 'second_lf':
- break
- # Handle first line
- line_hdrs = resp_read.splitlines()
- line_comp = line_hdrs[0].split()
- self.status = line_comp[1]
- del line_hdrs[0]
- self.encoding = ''
- self.content_type = ''
- headers = dict()
- # Process other headers
- for h in range(len(line_hdrs)):
- line_comp = line_hdrs[h].split(':')
- if line_comp[0] == 'Content-Length':
- self.content_len = int(line_comp[1])
- if line_comp[0] == 'Content-Type':
- self.content_type = line_comp[1].lstrip()
- if line_comp[0] == 'Transfer-Encoding':
- self.encoding = line_comp[1].lstrip()
- if len(line_comp) == 2:
- headers[line_comp[0]] = line_comp[1].lstrip()
- return headers
- except socket.error as err:
- self.client.close()
- logging.info('Socket Error in recv :{}'.format(err))
- return None
- def read_resp_data(self):
- try:
- read_data = ''
- if self.encoding != 'chunked':
- while len(read_data) != self.content_len:
- read_data += self.client.recv(self.content_len).decode()
- else:
- chunk_data_buf = ''
- while (True):
- # Read one character into temp buffer
- read_ch = self.client.recv(1)
- # Check CRLF
- if (read_ch == '\r'):
- read_ch = self.client.recv(1).decode()
- if (read_ch == '\n'):
- # If CRLF decode length of chunk
- chunk_len = int(chunk_data_buf, 16)
- # Keep adding to contents
- self.content_len += chunk_len
- rem_len = chunk_len
- while (rem_len):
- new_data = self.client.recv(rem_len)
- read_data += new_data
- rem_len -= len(new_data)
- chunk_data_buf = ''
- # Fetch remaining CRLF
- if self.client.recv(2) != '\r\n':
- # Error in packet
- logging.info('Error in chunked data')
- return None
- if not chunk_len:
- # If last chunk
- break
- continue
- chunk_data_buf += '\r'
- # If not CRLF continue appending
- # character to chunked data buffer
- chunk_data_buf += read_ch
- return read_data
- except socket.error as err:
- self.client.close()
- logging.info('Socket Error in recv :{}'.format(err))
- return None
- def close(self):
- self.client.close()
- def test_val(text, expected, received):
- if expected != received:
- logging.info(' Fail!')
- logging.info(' [reason] {}:'.format(text))
- logging.info(' expected: {}'.format(expected))
- logging.info(' received: {}'.format(received))
- return False
- return True
- class adder_thread (threading.Thread):
- def __init__(self, id, dut, port):
- threading.Thread.__init__(self)
- self.id = id
- self.dut = dut
- self.depth = 3
- self.session = Session(dut, port)
- def run(self):
- self.response = []
- # Pipeline 3 requests
- if (_verbose_):
- logging.info(' Thread: Using adder start {}'.format(self.id))
- for _ in range(self.depth):
- self.session.send_post('/adder', str(self.id))
- time.sleep(2)
- for _ in range(self.depth):
- self.session.read_resp_hdrs()
- self.response.append(self.session.read_resp_data())
- def adder_result(self):
- if len(self.response) != self.depth:
- logging.info('Error : missing response packets')
- return False
- for i in range(len(self.response)):
- if not test_val('Thread' + str(self.id) + ' response[' + str(i) + ']',
- str(self.id * (i + 1)), str(self.response[i])):
- return False
- return True
- def close(self):
- self.session.close()
- def get_hello(dut, port):
- # GET /hello should return 'Hello World!'
- logging.info("[test] GET /hello returns 'Hello World!' =>")
- conn = http.client.HTTPConnection(dut, int(port), timeout=15)
- conn.request('GET', '/hello')
- resp = conn.getresponse()
- if not test_val('status_code', 200, resp.status):
- conn.close()
- return False
- if not test_val('data', 'Hello World!', resp.read().decode()):
- conn.close()
- return False
- if not test_val('data', 'text/html', resp.getheader('Content-Type')):
- conn.close()
- return False
- logging.info('Success')
- conn.close()
- return True
- def put_hello(dut, port):
- # PUT /hello returns 405'
- logging.info('[test] PUT /hello returns 405 =>')
- conn = http.client.HTTPConnection(dut, int(port), timeout=15)
- conn.request('PUT', '/hello', 'Hello')
- resp = conn.getresponse()
- if not test_val('status_code', 405, resp.status):
- conn.close()
- return False
- logging.info('Success')
- conn.close()
- return True
- def post_hello(dut, port):
- # POST /hello returns 405'
- logging.info('[test] POST /hello returns 405 =>')
- conn = http.client.HTTPConnection(dut, int(port), timeout=15)
- conn.request('POST', '/hello', 'Hello')
- resp = conn.getresponse()
- if not test_val('status_code', 405, resp.status):
- conn.close()
- return False
- logging.info('Success')
- conn.close()
- return True
- def post_echo(dut, port):
- # POST /echo echoes data'
- logging.info('[test] POST /echo echoes data =>')
- conn = http.client.HTTPConnection(dut, int(port), timeout=15)
- conn.request('POST', '/echo', 'Hello')
- resp = conn.getresponse()
- if not test_val('status_code', 200, resp.status):
- conn.close()
- return False
- if not test_val('data', 'Hello', resp.read().decode()):
- conn.close()
- return False
- logging.info('Success')
- conn.close()
- return True
- def put_echo(dut, port):
- # PUT /echo echoes data'
- logging.info('[test] PUT /echo echoes data =>')
- conn = http.client.HTTPConnection(dut, int(port), timeout=15)
- conn.request('PUT', '/echo', 'Hello')
- resp = conn.getresponse()
- if not test_val('status_code', 200, resp.status):
- conn.close()
- return False
- if not test_val('data', 'Hello', resp.read().decode()):
- conn.close()
- return False
- logging.info('Success')
- conn.close()
- return True
- def get_echo(dut, port):
- # GET /echo returns 404'
- logging.info('[test] GET /echo returns 405 =>')
- conn = http.client.HTTPConnection(dut, int(port), timeout=15)
- conn.request('GET', '/echo')
- resp = conn.getresponse()
- if not test_val('status_code', 405, resp.status):
- conn.close()
- return False
- logging.info('Success')
- conn.close()
- return True
- def get_test_headers(dut, port):
- # GET /test_header returns data of Header2'
- logging.info('[test] GET /test_header =>')
- conn = http.client.HTTPConnection(dut, int(port), timeout=15)
- custom_header = {'Header1': 'Value1', 'Header3': 'Value3'}
- header2_values = ['', ' ', 'Value2', ' Value2', 'Value2 ', ' Value2 ']
- for val in header2_values:
- custom_header['Header2'] = val
- conn.request('GET', '/test_header', headers=custom_header)
- resp = conn.getresponse()
- if not test_val('status_code', 200, resp.status):
- conn.close()
- return False
- hdr_val_start_idx = val.find('Value2')
- if hdr_val_start_idx == -1:
- if not test_val('header: Header2', '', resp.read().decode()):
- conn.close()
- return False
- else:
- if not test_val('header: Header2', val[hdr_val_start_idx:], resp.read().decode()):
- conn.close()
- return False
- resp.read()
- logging.info('Success')
- conn.close()
- return True
- def get_hello_type(dut, port):
- # GET /hello/type_html returns text/html as Content-Type'
- logging.info('[test] GET /hello/type_html has Content-Type of text/html =>')
- conn = http.client.HTTPConnection(dut, int(port), timeout=15)
- conn.request('GET', '/hello/type_html')
- resp = conn.getresponse()
- if not test_val('status_code', 200, resp.status):
- conn.close()
- return False
- if not test_val('data', 'Hello World!', resp.read().decode()):
- conn.close()
- return False
- if not test_val('data', 'text/html', resp.getheader('Content-Type')):
- conn.close()
- return False
- logging.info('Success')
- conn.close()
- return True
- def get_hello_status(dut, port):
- # GET /hello/status_500 returns status 500'
- logging.info('[test] GET /hello/status_500 returns status 500 =>')
- conn = http.client.HTTPConnection(dut, int(port), timeout=15)
- conn.request('GET', '/hello/status_500')
- resp = conn.getresponse()
- if not test_val('status_code', 500, resp.status):
- conn.close()
- return False
- logging.info('Success')
- conn.close()
- return True
- def get_false_uri(dut, port):
- # GET /false_uri returns status 404'
- logging.info('[test] GET /false_uri returns status 404 =>')
- conn = http.client.HTTPConnection(dut, int(port), timeout=15)
- conn.request('GET', '/false_uri')
- resp = conn.getresponse()
- if not test_val('status_code', 404, resp.status):
- conn.close()
- return False
- logging.info('Success')
- conn.close()
- return True
- def parallel_sessions_adder(dut, port, max_sessions):
- # POSTs on /adder in parallel sessions
- logging.info('[test] POST {pipelined} on /adder in ' + str(max_sessions) + ' sessions =>')
- t = []
- # Create all sessions
- for i in range(max_sessions):
- t.append(adder_thread(i, dut, port))
- for i in range(len(t)):
- t[i].start()
- for i in range(len(t)):
- t[i].join()
- res = True
- for i in range(len(t)):
- if not test_val('Thread' + str(i) + ' Failed', t[i].adder_result(), True):
- res = False
- t[i].close()
- if (res):
- logging.info('Success')
- return res
- def async_response_test(dut, port):
- # Test that an asynchronous work is executed in the HTTPD's context
- # This is tested by reading two responses over the same session
- logging.info('[test] Test HTTPD Work Queue (Async response) =>')
- s = Session(dut, port)
- s.send_get('/async_data')
- s.read_resp_hdrs()
- if not test_val('First Response', 'Hello World!', s.read_resp_data()):
- s.close()
- return False
- s.read_resp_hdrs()
- if not test_val('Second Response', 'Hello Double World!', s.read_resp_data()):
- s.close()
- return False
- s.close()
- logging.info('Success')
- return True
- def leftover_data_test(dut, port):
- # Leftover data in POST is purged (valid and invalid URIs)
- logging.info('[test] Leftover data in POST is purged (valid and invalid URIs) =>')
- s = http.client.HTTPConnection(dut + ':' + port, timeout=15)
- s.request('POST', url='/leftover_data', body='abcdefghijklmnopqrstuvwxyz\r\nabcdefghijklmnopqrstuvwxyz')
- resp = s.getresponse()
- if not test_val('Partial data', 'abcdefghij', resp.read().decode()):
- s.close()
- return False
- s.request('GET', url='/hello')
- resp = s.getresponse()
- if not test_val('Hello World Data', 'Hello World!', resp.read().decode()):
- s.close()
- return False
- s.request('POST', url='/false_uri', body='abcdefghijklmnopqrstuvwxyz\r\nabcdefghijklmnopqrstuvwxyz')
- resp = s.getresponse()
- if not test_val('False URI Status', str(404), str(resp.status)):
- s.close()
- return False
- # socket would have been closed by server due to error
- s.close()
- s = http.client.HTTPConnection(dut + ':' + port, timeout=15)
- s.request('GET', url='/hello')
- resp = s.getresponse()
- if not test_val('Hello World Data', 'Hello World!', resp.read().decode()):
- s.close()
- return False
- s.close()
- logging.info('Success')
- return True
- def spillover_session(dut, port, max_sess):
- # Session max_sess_sessions + 1 is rejected
- logging.info('[test] Session max_sess_sessions ({}) + 1 is rejected =>'.format(max_sess))
- s = []
- _verbose_ = True
- for i in range(max_sess + 1):
- if (_verbose_):
- logging.info('Executing {}'.format(i))
- try:
- a = http.client.HTTPConnection(dut + ':' + port, timeout=15)
- a.request('GET', url='/hello')
- resp = a.getresponse()
- if not test_val('Connection ' + str(i), 'Hello World!', resp.read().decode()):
- a.close()
- break
- s.append(a)
- except Exception:
- if (_verbose_):
- logging.info('Connection {} rejected'.format(i))
- a.close()
- break
- # Close open connections
- for a in s:
- a.close()
- # Check if number of connections is equal to max_sess
- logging.info(['Fail','Success'][len(s) == max_sess])
- return (len(s) == max_sess)
- def recv_timeout_test(dut, port):
- logging.info('[test] Timeout occurs if partial packet sent =>')
- s = Session(dut, port)
- s.client.sendall(b'GE')
- s.read_resp_hdrs()
- resp = s.read_resp_data()
- if not test_val('Request Timeout', 'Server closed this connection', resp):
- s.close()
- return False
- s.close()
- logging.info('Success')
- return True
- def packet_size_limit_test(dut, port, test_size):
- logging.info('[test] send size limit test =>')
- retry = 5
- while (retry):
- retry -= 1
- logging.info('data size = {}'.format(test_size))
- s = http.client.HTTPConnection(dut + ':' + port, timeout=15)
- random_data = ''.join(string.printable[random.randint(0,len(string.printable)) - 1] for _ in list(range(test_size)))
- path = '/echo'
- s.request('POST', url=path, body=random_data)
- resp = s.getresponse()
- if not test_val('Error', '200', str(resp.status)):
- if test_val('Error', '500', str(resp.status)):
- logging.info('Data too large to be allocated')
- test_size = test_size // 10
- else:
- logging.info('Unexpected error')
- s.close()
- logging.info('Retry...')
- continue
- resp = resp.read().decode()
- result = (resp == random_data)
- if not result:
- test_val('Data size', str(len(random_data)), str(len(resp)))
- s.close()
- logging.info('Retry...')
- continue
- s.close()
- logging.info('Success')
- return True
- logging.info('Failed')
- return False
- def arbitrary_termination_test(dut, port):
- logging.info('[test] Arbitrary termination test =>')
- cases = [
- {
- 'request': 'POST /echo HTTP/1.1\r\nHost: ' + dut + '\r\nCustom: SomeValue\r\n\r\n',
- 'code': '200',
- 'header': 'SomeValue'
- },
- {
- 'request': 'POST /echo HTTP/1.1\nHost: ' + dut + '\r\nCustom: SomeValue\r\n\r\n',
- 'code': '200',
- 'header': 'SomeValue'
- },
- {
- 'request': 'POST /echo HTTP/1.1\r\nHost: ' + dut + '\nCustom: SomeValue\r\n\r\n',
- 'code': '200',
- 'header': 'SomeValue'
- },
- {
- 'request': 'POST /echo HTTP/1.1\r\nHost: ' + dut + '\r\nCustom: SomeValue\n\r\n',
- 'code': '200',
- 'header': 'SomeValue'
- },
- {
- 'request': 'POST /echo HTTP/1.1\r\nHost: ' + dut + '\r\nCustom: SomeValue\r\n\n',
- 'code': '200',
- 'header': 'SomeValue'
- },
- {
- 'request': 'POST /echo HTTP/1.1\nHost: ' + dut + '\nCustom: SomeValue\n\n',
- 'code': '200',
- 'header': 'SomeValue'
- },
- {
- 'request': 'POST /echo HTTP/1.1\r\nHost: ' + dut + '\r\nContent-Length: 5\n\r\nABCDE',
- 'code': '200',
- 'body': 'ABCDE'
- },
- {
- 'request': 'POST /echo HTTP/1.1\r\nHost: ' + dut + '\r\nContent-Length: 5\r\n\nABCDE',
- 'code': '200',
- 'body': 'ABCDE'
- },
- {
- 'request': 'POST /echo HTTP/1.1\r\nHost: ' + dut + '\r\nContent-Length: 5\n\nABCDE',
- 'code': '200',
- 'body': 'ABCDE'
- },
- {
- 'request': 'POST /echo HTTP/1.1\r\nHost: ' + dut + '\r\nContent-Length: 5\n\n\rABCD',
- 'code': '200',
- 'body': '\rABCD'
- },
- {
- 'request': 'POST /echo HTTP/1.1\r\nHost: ' + dut + '\r\r\nCustom: SomeValue\r\r\n\r\r\n',
- 'code': '400'
- },
- {
- 'request': 'POST /echo HTTP/1.1\r\r\nHost: ' + dut + '\r\n\r\n',
- 'code': '400'
- },
- {
- 'request': 'POST /echo HTTP/1.1\r\n\rHost: ' + dut + '\r\n\r\n',
- 'code': '400'
- },
- {
- 'request': 'POST /echo HTTP/1.1\r\nHost: ' + dut + '\rCustom: SomeValue\r\n',
- 'code': '400'
- },
- {
- 'request': 'POST /echo HTTP/1.1\r\nHost: ' + dut + '\r\nCustom: Some\rValue\r\n',
- 'code': '400'
- },
- {
- 'request': 'POST /echo HTTP/1.1\r\nHost: ' + dut + '\r\nCustom- SomeValue\r\n\r\n',
- 'code': '400'
- }
- ]
- for case in cases:
- s = Session(dut, port)
- s.client.sendall((case['request']).encode())
- resp_hdrs = s.read_resp_hdrs()
- resp_body = s.read_resp_data()
- s.close()
- if not test_val('Response Code', case['code'], s.status):
- return False
- if 'header' in case.keys():
- resp_hdr_val = None
- if 'Custom' in resp_hdrs.keys():
- resp_hdr_val = resp_hdrs['Custom']
- if not test_val('Response Header', case['header'], resp_hdr_val):
- return False
- if 'body' in case.keys():
- if not test_val('Response Body', case['body'], resp_body):
- return False
- logging.info('Success')
- return True
- def code_500_server_error_test(dut, port):
- logging.info('[test] 500 Server Error test =>')
- s = Session(dut, port)
- # Sending a very large content length will cause malloc to fail
- content_len = 2**30
- s.client.sendall(('POST /echo HTTP/1.1\r\nHost: ' + dut + '\r\nContent-Length: ' + str(content_len) + '\r\n\r\nABCD').encode())
- s.read_resp_hdrs()
- s.read_resp_data()
- if not test_val('Server Error', '500', s.status):
- s.close()
- return False
- s.close()
- logging.info('Success')
- return True
- def code_501_method_not_impl(dut, port):
- logging.info('[test] 501 Method Not Implemented =>')
- s = Session(dut, port)
- path = '/hello'
- s.client.sendall(('ABC ' + path + ' HTTP/1.1\r\nHost: ' + dut + '\r\n\r\n').encode())
- s.read_resp_hdrs()
- s.read_resp_data()
- # Presently server sends back 400 Bad Request
- # if not test_val("Server Error", "501", s.status):
- # s.close()
- # return False
- if not test_val('Server Error', '400', s.status):
- s.close()
- return False
- s.close()
- logging.info('Success')
- return True
- def code_505_version_not_supported(dut, port):
- logging.info('[test] 505 Version Not Supported =>')
- s = Session(dut, port)
- path = '/hello'
- s.client.sendall(('GET ' + path + ' HTTP/2.0\r\nHost: ' + dut + '\r\n\r\n').encode())
- s.read_resp_hdrs()
- s.read_resp_data()
- if not test_val('Server Error', '505', s.status):
- s.close()
- return False
- s.close()
- logging.info('Success')
- return True
- def code_400_bad_request(dut, port):
- logging.info('[test] 400 Bad Request =>')
- s = Session(dut, port)
- path = '/hello'
- s.client.sendall(('XYZ ' + path + ' HTTP/1.1\r\nHost: ' + dut + '\r\n\r\n').encode())
- s.read_resp_hdrs()
- s.read_resp_data()
- if not test_val('Client Error', '400', s.status):
- s.close()
- return False
- s.close()
- logging.info('Success')
- return True
- def code_404_not_found(dut, port):
- logging.info('[test] 404 Not Found =>')
- s = Session(dut, port)
- path = '/dummy'
- s.client.sendall(('GET ' + path + ' HTTP/1.1\r\nHost: ' + dut + '\r\n\r\n').encode())
- s.read_resp_hdrs()
- s.read_resp_data()
- if not test_val('Client Error', '404', s.status):
- s.close()
- return False
- s.close()
- logging.info('Success')
- return True
- def code_405_method_not_allowed(dut, port):
- logging.info('[test] 405 Method Not Allowed =>')
- s = Session(dut, port)
- path = '/hello'
- s.client.sendall(('POST ' + path + ' HTTP/1.1\r\nHost: ' + dut + '\r\n\r\n').encode())
- s.read_resp_hdrs()
- s.read_resp_data()
- if not test_val('Client Error', '405', s.status):
- s.close()
- return False
- s.close()
- logging.info('Success')
- return True
- def code_408_req_timeout(dut, port):
- logging.info('[test] 408 Request Timeout =>')
- s = Session(dut, port)
- s.client.sendall(('POST /echo HTTP/1.1\r\nHost: ' + dut + '\r\nContent-Length: 10\r\n\r\nABCD').encode())
- s.read_resp_hdrs()
- s.read_resp_data()
- if not test_val('Client Error', '408', s.status):
- s.close()
- return False
- s.close()
- logging.info('Success')
- return True
- def code_411_length_required(dut, port):
- logging.info('[test] 411 Length Required =>')
- s = Session(dut, port)
- path = '/echo'
- s.client.sendall(('POST ' + path + ' HTTP/1.1\r\nHost: ' + dut + '\r\nContent-Type: text/plain\r\nTransfer-Encoding: chunked\r\n\r\n').encode())
- s.read_resp_hdrs()
- s.read_resp_data()
- # Presently server sends back 400 Bad Request
- # if not test_val("Client Error", "411", s.status):
- # s.close()
- # return False
- if not test_val('Client Error', '400', s.status):
- s.close()
- return False
- s.close()
- logging.info('Success')
- return True
- def send_getx_uri_len(dut, port, length):
- s = Session(dut, port)
- method = 'GET '
- version = ' HTTP/1.1\r\n'
- path = '/' + 'x' * (length - len(method) - len(version) - len('/'))
- s.client.sendall(method.encode())
- time.sleep(1)
- s.client.sendall(path.encode())
- time.sleep(1)
- s.client.sendall((version + 'Host: ' + dut + '\r\n\r\n').encode())
- s.read_resp_hdrs()
- s.read_resp_data()
- s.close()
- return s.status
- def code_414_uri_too_long(dut, port, max_uri_len):
- logging.info('[test] 414 URI Too Long =>')
- status = send_getx_uri_len(dut, port, max_uri_len)
- if not test_val('Client Error', '404', status):
- return False
- status = send_getx_uri_len(dut, port, max_uri_len + 1)
- if not test_val('Client Error', '414', status):
- return False
- logging.info('Success')
- return True
- def send_postx_hdr_len(dut, port, length):
- s = Session(dut, port)
- path = '/echo'
- host = 'Host: ' + dut
- custom_hdr_field = '\r\nCustom: '
- custom_hdr_val = 'x' * (length - len(host) - len(custom_hdr_field) - len('\r\n\r\n') + len('0'))
- request = ('POST ' + path + ' HTTP/1.1\r\n' + host + custom_hdr_field + custom_hdr_val + '\r\n\r\n').encode()
- s.client.sendall(request[:length // 2])
- time.sleep(1)
- s.client.sendall(request[length // 2:])
- hdr = s.read_resp_hdrs()
- resp = s.read_resp_data()
- s.close()
- if hdr and ('Custom' in hdr):
- return (hdr['Custom'] == custom_hdr_val), resp
- return False, s.status
- def code_431_hdr_too_long(dut, port, max_hdr_len):
- logging.info('[test] 431 Header Too Long =>')
- res, status = send_postx_hdr_len(dut, port, max_hdr_len)
- if not res:
- return False
- res, status = send_postx_hdr_len(dut, port, max_hdr_len + 1)
- if not test_val('Client Error', '431', status):
- return False
- logging.info('Success')
- return True
- def test_upgrade_not_supported(dut, port):
- logging.info('[test] Upgrade Not Supported =>')
- s = Session(dut, port)
- # path = "/hello"
- s.client.sendall(('OPTIONS * HTTP/1.1\r\nHost:' + dut + '\r\nUpgrade: TLS/1.0\r\nConnection: Upgrade\r\n\r\n').encode())
- s.read_resp_hdrs()
- s.read_resp_data()
- if not test_val('Client Error', '400', s.status):
- s.close()
- return False
- s.close()
- logging.info('Success')
- return True
- if __name__ == '__main__':
- # Execution begins here...
- # Configuration
- # Max number of threads/sessions
- max_sessions = 7
- max_uri_len = 512
- max_hdr_len = 512
- parser = argparse.ArgumentParser(description='Run HTTPD Test')
- parser.add_argument('-4','--ipv4', help='IPv4 address')
- parser.add_argument('-6','--ipv6', help='IPv6 address')
- parser.add_argument('-p','--port', help='Port')
- args = vars(parser.parse_args())
- dut4 = args['ipv4']
- dut6 = args['ipv6']
- port = args['port']
- dut = dut4
- _verbose_ = True
- logging.info('### Basic HTTP Client Tests')
- get_hello(dut, port)
- post_hello(dut, port)
- put_hello(dut, port)
- post_echo(dut, port)
- get_echo(dut, port)
- put_echo(dut, port)
- get_hello_type(dut, port)
- get_hello_status(dut, port)
- get_false_uri(dut, port)
- get_test_headers(dut, port)
- logging.info('### Error code tests')
- code_500_server_error_test(dut, port)
- code_501_method_not_impl(dut, port)
- code_505_version_not_supported(dut, port)
- code_400_bad_request(dut, port)
- code_404_not_found(dut, port)
- code_405_method_not_allowed(dut, port)
- code_408_req_timeout(dut, port)
- code_414_uri_too_long(dut, port, max_uri_len)
- code_431_hdr_too_long(dut, port, max_hdr_len)
- test_upgrade_not_supported(dut, port)
- # Not supported yet (Error on chunked request)
- # code_411_length_required(dut, port)
- logging.info('### Sessions and Context Tests')
- parallel_sessions_adder(dut, port, max_sessions)
- leftover_data_test(dut, port)
- async_response_test(dut, port)
- spillover_session(dut, port, max_sessions)
- recv_timeout_test(dut, port)
- packet_size_limit_test(dut, port, 50 * 1024)
- arbitrary_termination_test(dut, port)
- get_hello(dut, port)
- sys.exit()
|