# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2014
# Authors: Cleber Rosa <cleber@redhat.com>
__doc__ = """
GDB Communication and Debugging Utilities
This module provides comprehensive functionality for interacting with the GNU Debugger (GDB)
through multiple interfaces and protocols. It supports both local debugging sessions and
remote debugging scenarios.
Key Features:
- GDB/MI (Machine Interface) communication for programmatic control
- GDB Server management for remote debugging sessions
- GDB Remote Protocol client implementation
- Command execution with structured result parsing
- Breakpoint management and program flow control
- Support for both CLI and MI command interfaces
Main Classes:
GDB: Wraps a local GDB subprocess with MI interface communication
GDBServer: Manages a gdbserver instance for remote debugging
GDBRemote: Implements GDB remote protocol client for direct communication
CommandResult: Encapsulates command execution results and metadata
Common Usage Patterns:
- Automated debugging workflows in test environments
- Remote debugging of embedded or containerized applications
- Programmatic analysis of application crashes and core dumps
- Integration with continuous integration and testing frameworks
The module handles low-level protocol details, message parsing, and connection management,
providing a high-level Python interface for GDB operations.
"""
__all__ = ["GDB", "GDBServer", "GDBRemote"]
import fcntl
import os
import socket
import subprocess
import tempfile
import time
from autils_external import gdbmi_parser
from autils.file.path import find_command
from autils.network import ports
GDB_PROMPT = b"(gdb)"
GDB_EXIT = b"^exit"
GDB_BREAK_CONDITIONS = [GDB_PROMPT, GDB_EXIT]
#: How the remote protocol signals a transmission success (in ACK mode)
REMOTE_TRANSMISSION_SUCCESS = "+"
#: How the remote protocol signals a transmission failure (in ACK mode)
REMOTE_TRANSMISSION_FAILURE = "-"
#: How the remote protocol flags the start of a packet
REMOTE_PREFIX = b"$"
#: How the remote protocol flags the end of the packet payload, and that the
#: two digits checksum follow
REMOTE_DELIMITER = b"#"
#: Rather conservative default maximum packet size for clients using the
#: remote protocol. Individual connections can ask (and do so by default)
#: the server about the maximum packet size they can handle.
REMOTE_MAX_PACKET_SIZE = 1024
class UnexpectedResponseError(Exception):
"""A response different from the one expected was received from GDB"""
class ServerInitTimeoutError(Exception):
"""Server took longer than expected to initialize itself properly"""
class InvalidPacketError(Exception):
"""Packet received has invalid format"""
class NotConnectedError(Exception):
"""GDBRemote is not connected to a remote GDB server"""
class RetransmissionRequestedError(Exception):
"""Message integrity was not validated and retransmission is being requested"""
def parse_mi(line):
"""Parse a GDB/MI line
:param line: a string supposedly coming from GDB using MI language
:type line: str
:returns: a parsed GDB/MI response
:rtype: gdbmi_parser.GdbMiRecord
"""
if not line.endswith("\n"):
line = f"{line}\n"
return gdbmi_parser.session().process(line)
def encode_mi_cli(command):
"""Encodes a regular (CLI) command into the proper MI form
:param command: the regular cli command to send
:type command: str
:returns: the encoded (escaped) MI command
:rtype: str
"""
return f'-interpreter-exec console "{command}"'
def is_stopped_exit(parsed_mi_msg):
"""Check if a parsed GDB MI message indicates the program exited normally.
:param parsed_mi_msg: a parsed GDB MI message object
:type parsed_mi_msg: gdbmi_parser.GdbMiRecord
:returns: True if the message indicates normal program exit, False otherwise
:rtype: bool
"""
return (
hasattr(parsed_mi_msg, "class_")
and (parsed_mi_msg.class_ == "stopped")
and hasattr(parsed_mi_msg, "result")
and hasattr(parsed_mi_msg.result, "reason")
and (parsed_mi_msg.result.reason == "exited")
)
def is_thread_group_exit(parsed_mi_msg):
"""Check if a parsed GDB MI message indicates a thread group has exited.
:param parsed_mi_msg: a parsed GDB MI message object
:type parsed_mi_msg: gdbmi_parser.GdbMiRecord
:returns: True if the message indicates thread group exit, False otherwise
:rtype: bool
"""
return hasattr(parsed_mi_msg, "class_") and (
parsed_mi_msg.class_ == "thread-group-exited"
)
def is_exit(parsed_mi_msg):
"""Check if a parsed GDB MI message indicates any type of program exit.
This function combines checks for both normal program exit and thread group exit.
:param parsed_mi_msg: a parsed GDB MI message object
:type parsed_mi_msg: gdbmi_parser.GdbMiRecord
:returns: True if the message indicates any form of program exit, False otherwise
:rtype: bool
"""
return is_stopped_exit(parsed_mi_msg) or is_thread_group_exit(parsed_mi_msg)
def is_break_hit(parsed_mi_msg):
"""Check if a parsed GDB MI message indicates a breakpoint was hit.
:param parsed_mi_msg: a parsed GDB MI message object
:type parsed_mi_msg: gdbmi_parser.GdbMiRecord
:returns: True if the message indicates a breakpoint hit, False otherwise
:rtype: bool
"""
return (
hasattr(parsed_mi_msg, "class_")
and (parsed_mi_msg.class_ == "stopped")
and hasattr(parsed_mi_msg, "result")
and hasattr(parsed_mi_msg.result, "reason")
and (parsed_mi_msg.result.reason == "breakpoint-hit")
)
def is_sigsegv(parsed_mi_msg):
"""Check if a parsed GDB MI message indicates a segmentation fault (SIGSEGV).
:param parsed_mi_msg: a parsed GDB MI message object
:type parsed_mi_msg: gdbmi_parser.GdbMiRecord
:returns: True if the message indicates SIGSEGV signal, False otherwise
:rtype: bool
"""
return (
hasattr(parsed_mi_msg, "class_")
and (parsed_mi_msg.class_ == "stopped")
and hasattr(parsed_mi_msg, "result")
and hasattr(parsed_mi_msg.result, "signal_name")
and (parsed_mi_msg.result.reason == "SIGSEGV")
)
def is_sigabrt_stopped(parsed_mi_msg):
"""Check if a parsed GDB MI message indicates a SIGABRT signal with stopped status.
:param parsed_mi_msg: a parsed GDB MI message object
:type parsed_mi_msg: gdbmi_parser.GdbMiRecord
:returns: True if the message indicates SIGABRT in stopped state, False otherwise
:rtype: bool
"""
return (
hasattr(parsed_mi_msg, "class_")
and (parsed_mi_msg.class_ == "stopped")
and hasattr(parsed_mi_msg, "record_type")
and (parsed_mi_msg.record_type == "result")
and (parsed_mi_msg.result.reason == "signal-received")
and (parsed_mi_msg.result.signal_name == "SIGABRT")
)
def is_sigabrt_console(parsed_mi_msg):
"""Check if a parsed GDB MI message indicates a SIGABRT signal from console output.
:param parsed_mi_msg: a parsed GDB MI message object
:type parsed_mi_msg: gdbmi_parser.GdbMiRecord
:returns: True if the message indicates SIGABRT from console, False otherwise
:rtype: bool
"""
return (
hasattr(parsed_mi_msg, "record_type")
and (parsed_mi_msg.record_type == "stream")
and hasattr(parsed_mi_msg, "type")
and (parsed_mi_msg.type == "console")
and hasattr(parsed_mi_msg, "value")
and parsed_mi_msg.value == "SIGABRT, Aborted.\n"
)
def is_sigabrt(parsed_mi_msg):
"""Check if a parsed GDB MI message indicates a SIGABRT signal from any source.
This function combines checks for SIGABRT from both stopped state and console output.
:param parsed_mi_msg: a parsed GDB MI message object
:type parsed_mi_msg: gdbmi_parser.GdbMiRecord
:returns: True if the message indicates SIGABRT from any source, False otherwise
:rtype: bool
"""
return is_sigabrt_stopped(parsed_mi_msg) or is_sigabrt_console(parsed_mi_msg)
def is_fatal_signal(parsed_mi_msg):
"""Check if a parsed GDB MI message indicates a fatal signal (SIGSEGV or SIGABRT).
This function identifies signals that typically indicate serious program errors
that would cause the program to terminate abnormally.
:param parsed_mi_msg: a parsed GDB MI message object
:type parsed_mi_msg: gdbmi_parser.GdbMiRecord
:returns: True if the message indicates a fatal signal, False otherwise
:rtype: bool
"""
return is_sigsegv(parsed_mi_msg) or is_sigabrt(parsed_mi_msg)
def format_as_hex(char):
"""Formats a single ascii character as a lower case hex string
:param char: a single ascii character
:type char: str
:returns: the character formatted as a lower case hex string
:rtype: str
"""
return f"{ord(char):2x}"
def string_to_hex(text):
"""Formats a string of text into an hex representation
:param text: a multi character string
:type text: str
:returns: the string converted to an hex representation
:rtype: str
"""
return "".join(map(format_as_hex, text))
class CommandResult:
"""A GDB command, its result, and other possible messages"""
def __init__(self, command):
self.command = command
self.timestamp = time.monotonic()
self.stream_messages = []
self.application_output = []
self.result = None
def get_application_output(self):
"""Return all application output concatenated as a single string
:returns: application output concatenated
:rtype: str
"""
return "".join(self.application_output)
def get_stream_messages_text(self):
"""Return all stream messages text concatenated as a single string
:returns: stream messages text concatenated
:rtype: str
"""
return "".join([m.value for m in self.stream_messages])
def __repr__(self):
return f"{self.command} at {self.timestamp:.9f}"
# pylint: disable=E1101
[docs]
class GDB:
"""Wraps a GDB subprocess for easier manipulation"""
REQUIRED_ARGS = ["--interpreter=mi", "--quiet"]
DEFAULT_BREAK = "main"
def __init__(self, path=None, *extra_args): # pylint: disable=W1113
if path is None:
path = find_command("gdb", default="/usr/bin/gdb")
self.path = path
args = [self.path]
args += self.REQUIRED_ARGS
args += extra_args
try:
self.process = subprocess.Popen( # pylint: disable=R1732
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
)
except OSError as details:
if details.errno == 2:
exc = OSError(f"File '{args[0]}' not found")
exc.errno = 2
raise exc from details
raise
fcntl.fcntl(self.process.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
self.read_until_break()
# If this instance is connected to another target. If so, what
# tcp port it's connected to
self.connected_to = None
# any GDB MI async messages
self.async_messages = []
self.commands_history = []
# whatever comes from the app that is not a GDB MI message
self.output_messages = []
self.output_messages_queue = []
[docs]
def read_gdb_response(self, timeout=0.01, max_tries=100):
"""Read raw responses from GDB
:param timeout: the amount of time to way between read attempts
:type timeout: float
:param max_tries: the maximum number of cycles to try to read until
a response is obtained
:type max_tries: int
:returns: a string containing a raw response from GDB
:rtype: str
:raises ValueError: if can't read GDB response
"""
current_try = 0
while current_try < max_tries:
try:
line = self.process.stdout.readline()
line = line.strip()
if line:
return line
except IOError:
current_try += 1
if current_try >= max_tries:
raise ValueError("Could not read GDB response")
time.sleep(timeout)
return None
[docs]
def read_until_break(self, max_lines=100):
"""Read lines from GDB until a break condition is reached
:param max_lines: the maximum number of lines to read
:type max_lines: int
:returns: a list of messages read
:rtype: list of str
"""
result = []
while True:
line = self.read_gdb_response()
if line in GDB_BREAK_CONDITIONS:
break
if len(result) >= max_lines:
break
result.append(line)
return result
[docs]
def send_gdb_command(self, command):
"""Send a raw command to the GNU debugger input
:param command: the GDB command, hopefully in MI language
:type command: str
"""
if not command.endswith("\n"):
command = f"{command}\n"
self.process.stdin.write(command.encode())
self.process.stdin.flush()
[docs]
def cmd(self, command):
"""Sends a command and parses all lines until prompt is received
:param command: the GDB command, hopefully in MI language
:type command: str
:returns: a :class:`CommandResult` instance
:rtype: :class:`CommandResult`
:raises gdbmi_parser.GdbMiError: if there are many result responses to a
single cmd
"""
cmd = CommandResult(command)
self.send_gdb_command(command)
responses = self.read_until_break()
result_response_received = False
for line in responses:
# If the line can not be properly parsed, it is *most likely*
# generated by the application being run inside the debugger
try:
parsed_response = parse_mi(line.decode())
except Exception: # pylint: disable=W0703
cmd.application_output.append(line)
continue
if (
parsed_response.type == "console"
and parsed_response.record_type == "stream"
):
cmd.stream_messages.append(parsed_response)
elif parsed_response.type == "result":
if result_response_received:
# raise an exception here, because no two result
# responses should come from a single command AFAIK
raise gdbmi_parser.GdbMiError(
"Many result responses to a single cmd"
)
result_response_received = True
cmd.result = parsed_response
else:
self.async_messages.append(parsed_response)
return cmd
[docs]
def cli_cmd(self, command):
"""Sends a cli command encoded as an MI command
:param command: a regular GDB cli command
:type command: str
:returns: a :class:`CommandResult` instance
:rtype: :class:`CommandResult`
"""
cmd = encode_mi_cli(command)
return self.cmd(cmd)
[docs]
def cmd_exists(self, command):
"""Checks if a given command exists
:param command: a GDB MI command, including the dash (-) prefix
:type command: str
:returns: either True or False
:rtype: bool
"""
gdb_info_command = f"-info-gdb-mi-command {command[1:]}"
r = self.cmd(gdb_info_command)
return r.result.result.command.exists == "true"
[docs]
def set_file(self, path):
"""Sets the file that will be executed
:param path: the path of the binary that will be executed
:type path: str
:returns: a :class:`CommandResult` instance
:rtype: :class:`CommandResult`
:raises UnexpectedResponseError: if response is unexpected
"""
cmd = f"-file-exec-and-symbols {path}"
r = self.cmd(cmd)
if not r.result.class_ == "done":
raise UnexpectedResponseError
if self.connected_to is not None:
cmd = f"set remote exec-file {path}"
r = self.cmd(cmd)
if not r.result.class_ == "done":
raise UnexpectedResponseError
return r
[docs]
def set_break(self, location, ignore_error=False):
"""Sets a new breakpoint on the binary currently being debugged
:param location: a breakpoint location expression as accepted by GDB
:type location: str
:param ignore_error: if set, won't raise exceptions
:type ignore_error: bool
:returns: a :class:`CommandResult` instance
:rtype: :class:`CommandResult`
:raises UnexpectedResponseError: if response is unexpected
"""
cmd = f"-break-insert {location}"
r = self.cmd(cmd)
if not r.result.class_ == "done":
if not ignore_error:
raise UnexpectedResponseError
return r
[docs]
def del_break(self, number):
"""Deletes a breakpoint by its number
:param number: the breakpoint number
:type number: int
:returns: a :class:`CommandResult` instance
:rtype: :class:`CommandResult`
:raises UnexpectedResponseError: if response is unexpected
"""
cmd = f"-break-delete {number}"
r = self.cmd(cmd)
if not r.result.class_ == "done":
raise UnexpectedResponseError
return r
[docs]
def run(self, args=None):
"""Runs the application inside the debugger
:param args: the arguments to be passed to the binary as command line
arguments
:type args: builtin.list
:returns: a :class:`CommandResult` instance
:rtype: :class:`CommandResult`
:raises UnexpectedResponseError: if response is unexpected
"""
if args:
args_text = " ".join(args)
cmd = f"-exec-arguments {args_text}"
r = self.cmd(cmd)
if not r.result.class_ == "done":
raise UnexpectedResponseError
r = self.cmd("-exec-run")
if not r.result.class_ == "running":
raise UnexpectedResponseError
return r
[docs]
def connect(self, port):
"""Connects to a remote debugger (a gdbserver) at the given TCP port
This uses the "extended-remote" target type only
:param port: the TCP port number
:type port: int
:returns: a :class:`CommandResult` instance
:rtype: :class:`CommandResult`
:raises UnexpectedResponseError: if response is unexpected
"""
cmd = f"-target-select extended-remote :{port}"
r = self.cmd(cmd)
if not r.result.class_ == "connected":
raise UnexpectedResponseError
self.connected_to = port
return r
[docs]
def disconnect(self):
"""Disconnects from a remote debugger
:returns: a :class:`CommandResult` instance
:rtype: :class:`CommandResult`
:raises UnexpectedResponseError: if response is unexpected
"""
cmd = "-target-disconnect"
r = self.cmd(cmd)
if not r.result.class_ == "done":
raise UnexpectedResponseError
self.connected_to = None
return r
[docs]
def exit(self):
"""Exits the GDB application gracefully
:returns: the result of :meth:`subprocess.POpen.wait`, that is, a
:attr:`subprocess.POpen.returncode`
:rtype: int or None
"""
self.cmd("-gdb-exit")
return self.process.wait()
[docs]
class GDBServer:
"""Wraps a gdbserver instance"""
#: The default arguments used when starting the GDB server process
REQUIRED_ARGS = ["--multi"]
#: The range from which a port to GDB server will try to be allocated from
PORT_RANGE = (20000, 20999)
#: The time to optionally wait for the server to initialize itself and be
#: ready to accept new connections
INIT_TIMEOUT = 5.0
# pylint: disable=W0613, W1113
def __init__(
self,
path=None,
port=None,
wait_until_running=True,
*extra_args,
):
"""Initializes a new gdbserver instance
:param path: location of the gdbserver binary
:type path: str
:param port: tcp port number to listen on for incoming connections
:type port: int
:param wait_until_running: wait until the gdbserver is running and
accepting connections. It may take a little
after the process is started and it is
actually bound to the allocated port
:type wait_until_running: bool
:param extra_args: optional extra arguments to be passed to gdbserver
"""
if path is None:
path = find_command("gdbserver", default="/usr/bin/gdbserver")
self.path = path
args = [self.path]
args += self.REQUIRED_ARGS
if port is None:
self.port = ports.find_free_port(*self.PORT_RANGE)
else:
self.port = port
args.append(f":{self.port}")
prefix = f"avocado_gdbserver_{self.port}_"
_, self.stdout_path = tempfile.mkstemp(prefix=prefix + "stdout_")
# pylint: disable=R1732
self.stdout = open(self.stdout_path, "w", encoding="utf-8")
_, self.stderr_path = tempfile.mkstemp(prefix=prefix + "stderr_")
# pylint: disable=R1732
self.stderr = open(self.stderr_path, "w", encoding="utf-8")
try:
# pylint: disable=R1732
self.process = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=self.stdout,
stderr=self.stderr,
close_fds=True,
)
except OSError as details:
if details.errno == 2:
exc = OSError(f"File '{args[0]}' not found")
exc.errno = 2
raise exc from details
raise
if wait_until_running:
self._wait_until_running()
def _wait_until_running(self):
connection_ok = False
c = GDB()
end_time = time.monotonic() + self.INIT_TIMEOUT
while time.monotonic() < end_time:
try:
c.connect(self.port)
connection_ok = True
break
except UnexpectedResponseError:
time.sleep(0.1)
c.disconnect()
c.exit()
if not connection_ok:
raise ServerInitTimeoutError
[docs]
def exit(self, force=True):
"""Quits the gdb_server process
Most correct way of quitting the GDB server is by sending it a command.
If no GDB client is connected, then we can try to connect to it and
send a quit command. If this is not possible, we send it a signal and
wait for it to finish.
:param force: if a forced exit (sending SIGTERM) should be attempted
:type force: bool
"""
temp_client = GDB()
try:
temp_client.connect(self.port)
temp_client.cli_cmd("monitor exit")
except (UnexpectedResponseError, ValueError):
if force:
self.process.kill()
finally:
try:
temp_client.disconnect()
temp_client.exit()
except (UnexpectedResponseError, ValueError):
if force:
temp_client.process.kill()
temp_client.process.wait()
self.process.wait()
self.stdout.close()
self.stderr.close()
[docs]
class GDBRemote:
"""A GDBRemote acts like a client that speaks the GDB remote protocol,
documented at:
https://sourceware.org/gdb/current/onlinedocs/gdb/Remote-Protocol.html
Caveat: we currently do not support communicating with devices, only
with TCP sockets. This limitation is basically due to the lack of
use cases that justify an implementation, but not due to any technical
shortcoming.
"""
def __init__(self, host, port, no_ack_mode=True, extended_mode=True):
"""Initializes a new GDBRemote object.
:param host: the IP address or host name
:type host: str
:param port: the port number where the the remote GDB is listening on
:type port: int
:param no_ack_mode: if the packet transmission confirmation mode should
be disabled
:type no_ack_mode: bool
:param extended_mode: if the remote extended mode should be enabled
:type param extended_mode: bool
"""
self.host = host
self.port = port
# Temporary holder for the class init attributes
self._no_ack_mode = no_ack_mode
self.no_ack_mode = False
self._extended_mode = extended_mode
self.extended_mode = False
self._socket = None
[docs]
@staticmethod
def checksum(input_message):
"""Calculates a remote message checksum.
More details are available at:
https://sourceware.org/gdb/current/onlinedocs/gdb/Overview.html
:param input_message: the message input payload, without the
start and end markers
:type input_message: bytes
:returns: two byte checksum
:rtype: bytes
"""
total = 0
for i in input_message:
total += i
result = total % 256
return b"%02x" % result
[docs]
@staticmethod
def encode(data):
"""Encodes a command.
That is, add prefix, suffix and checksum.
More details are available at:
https://sourceware.org/gdb/current/onlinedocs/gdb/Overview.html
:param data: the command data payload
:type data: bytes
:returns: the encoded command, ready to be sent to a remote GDB
:rtype: bytes
"""
return b"$%b#%b" % (data, GDBRemote.checksum(data))
[docs]
@staticmethod
def decode(data):
"""Decodes a packet and returns its payload.
More details are available at:
https://sourceware.org/gdb/current/onlinedocs/gdb/Overview.html
:param data: the command data payload
:type data: bytes
:returns: the encoded command, ready to be sent to a remote GDB
:rtype: bytes
:raises InvalidPacketError: if the packet is not well constructed,
like in checksum mismatches
"""
if data[0:1] != REMOTE_PREFIX:
raise InvalidPacketError
if data[-3:-2] != REMOTE_DELIMITER:
raise InvalidPacketError
payload = data[1:-3]
checksum = data[-2:]
if payload == b"":
expected_checksum = b"00"
else:
expected_checksum = GDBRemote.checksum(payload)
if checksum != expected_checksum:
raise InvalidPacketError
return payload
[docs]
def cmd(self, command_data, expected_response=None):
"""Sends a command data to a remote gdb server
Limitations: the current version does not deal with retransmissions.
:param command_data: the remote command to send the the remote stub
:type command_data: str
:param expected_response: the (optional) response that is expected
as a response for the command sent
:type expected_response: str
:returns: raw data read from from the remote server
:rtype: str
:raises NotConnectedError: if the socket is not initialized
:raises RetransmissionRequestedError: if there was a failure while
reading the result of the command
:raises UnexpectedResponseError: if response is unexpected
"""
if self._socket is None:
raise NotConnectedError
data = self.encode(command_data)
self._socket.send(data)
if not self.no_ack_mode:
transmission_result = self._socket.recv(1)
if transmission_result == REMOTE_TRANSMISSION_FAILURE:
raise RetransmissionRequestedError
result = self._socket.recv(REMOTE_MAX_PACKET_SIZE)
response_payload = self.decode(result)
if expected_response is not None:
if expected_response != response_payload:
raise UnexpectedResponseError
return response_payload
[docs]
def set_extended_mode(self):
"""Enable extended mode. In extended mode, the remote server is made
persistent. The 'R' packet is used to restart the program being
debugged. Original documentation at:
https://sourceware.org/gdb/current/onlinedocs/gdb/Packets.html#extended-mode
"""
self.cmd(b"!", b"OK")
self.extended_mode = True
[docs]
def start_no_ack_mode(self):
"""Request that the remote stub disable the normal +/- protocol
acknowledgments. Original documentation at:
https://sourceware.org/gdb/current/onlinedocs/gdb/General-Query-Packets.html#QStartNoAckMode
"""
self.cmd(b"QStartNoAckMode", b"OK")
self.no_ack_mode = True
[docs]
def connect(self):
"""Connects to the remote target and initializes the chosen modes"""
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect((self.host, self.port))
if self._no_ack_mode:
self.start_no_ack_mode()
if self._extended_mode:
self.set_extended_mode()