move towards single implementation for socket client/server using asyncore and threads
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
||||
/venv
|
||||
/.pydevproject
|
||||
/.project
|
||||
*.pyc
|
||||
|
||||
13
TcpClient.py
13
TcpClient.py
@@ -1,13 +0,0 @@
|
||||
import socket
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect(('localhost', 8881))
|
||||
print "Connected to server"
|
||||
data = """A few lines of data
|
||||
to test the operation
|
||||
of both server and client."""
|
||||
for line in data.splitlines( ):
|
||||
sock.sendall(line+'\n')
|
||||
print "Sent:", line
|
||||
response = sock.recv(8192)
|
||||
print "Received:", response
|
||||
sock.close( )
|
||||
@@ -1,24 +0,0 @@
|
||||
import asyncore
|
||||
import socket
|
||||
|
||||
class MainServerSocket(asyncore.dispatcher):
|
||||
def __init__(self, port):
|
||||
asyncore.dispatcher.__init__(self)
|
||||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.bind(('',port))
|
||||
self.listen(5)
|
||||
def handle_accept(self):
|
||||
newSocket, address = self.accept( )
|
||||
print "Connected from", address
|
||||
SecondaryServerSocket(newSocket)
|
||||
|
||||
class SecondaryServerSocket(asyncore.dispatcher_with_send):
|
||||
def handle_read(self):
|
||||
receivedData = self.recv(8192)
|
||||
if receivedData: self.send(receivedData)
|
||||
else: self.close( )
|
||||
def handle_close(self):
|
||||
print "Disconnected from", self.getpeername( )
|
||||
|
||||
MainServerSocket(10000)
|
||||
asyncore.loop( )
|
||||
52
mysocket/TcpClient.py
Normal file
52
mysocket/TcpClient.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import asyncore
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
||||
class TcpClient(asyncore.dispatcher):
|
||||
queue = []
|
||||
|
||||
def __init__(self, host, port):
|
||||
asyncore.dispatcher.__init__(self)
|
||||
self.address = (host, port)
|
||||
|
||||
def handle_connect(self):
|
||||
print "Connected to server"
|
||||
|
||||
def handle_close(self):
|
||||
self.close()
|
||||
|
||||
def handle_read(self):
|
||||
response = self.recv(8192)
|
||||
print "Received:", response
|
||||
|
||||
def writable(self):
|
||||
return (len(self.queue) > 0)
|
||||
|
||||
def handle_write(self):
|
||||
for data in self.queue:
|
||||
self.send(data)
|
||||
self.queue = []
|
||||
|
||||
def start(self):
|
||||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.connect(self.address)
|
||||
self.thread = threading.Thread(target=asyncore.loop, kwargs = {'timeout': 1})
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
self.close()
|
||||
self.thread.join()
|
||||
|
||||
def _send(self, data):
|
||||
self.queue.append(data)
|
||||
|
||||
if __name__ == '__main__':
|
||||
client = TcpClient('localhost', 10000)
|
||||
client.start()
|
||||
|
||||
for i in range(1, 50):
|
||||
client.send('client #%d' % i)
|
||||
time.sleep(1)
|
||||
|
||||
client.stop()
|
||||
70
mysocket/TcpServer.py
Normal file
70
mysocket/TcpServer.py
Normal file
@@ -0,0 +1,70 @@
|
||||
import asyncore
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
||||
class TcpServer(asyncore.dispatcher):
|
||||
clients = []
|
||||
|
||||
def __init__(self, host, port):
|
||||
asyncore.dispatcher.__init__(self)
|
||||
self.address = (host, port)
|
||||
|
||||
def handle_accept(self):
|
||||
mysocket, address = self.accept()
|
||||
print 'Connected from', address
|
||||
self.clients.append(TcpServerClient(self, mysocket))
|
||||
|
||||
def start(self):
|
||||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.bind(self.address)
|
||||
self.listen(5)
|
||||
self.thread = threading.Thread(target=asyncore.loop, kwargs = {'timeout': 0.5})
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
self.close()
|
||||
self.thread.join()
|
||||
|
||||
def send(self, message):
|
||||
for client in self.clients:
|
||||
client._send(message)
|
||||
|
||||
def input(self, client, data):
|
||||
print 'input from ', client.getpeername(), ': ', data
|
||||
|
||||
class TcpServerClient(asyncore.dispatcher_with_send):
|
||||
queue = []
|
||||
|
||||
def __init__(self, server, mysocket):
|
||||
asyncore.dispatcher_with_send.__init__(self, mysocket)
|
||||
self.server = server
|
||||
|
||||
def handle_read(self):
|
||||
data = self.recv(8192)
|
||||
if data:
|
||||
self.server.input(self, data)
|
||||
|
||||
def handle_close(self):
|
||||
print 'Disconnected from', self.getpeername()
|
||||
|
||||
def handle_write(self):
|
||||
for data in self.queue:
|
||||
self.send(data)
|
||||
self.queue = []
|
||||
|
||||
def writable(self):
|
||||
return len(self.queue) > 0
|
||||
|
||||
def _send(self, data):
|
||||
self.queue.append(data)
|
||||
|
||||
if __name__ == '__main__':
|
||||
server = TcpServer('', 10000)
|
||||
server.start()
|
||||
|
||||
for i in range(1, 50):
|
||||
server.send('server #%d' % i)
|
||||
time.sleep(1)
|
||||
|
||||
server.stop()
|
||||
0
mysocket/__init__.py
Normal file
0
mysocket/__init__.py
Normal file
@@ -1,21 +0,0 @@
|
||||
import socket
|
||||
import sys
|
||||
|
||||
HOST, PORT = "localhost", 10000
|
||||
data = " ".join(sys.argv[1:])
|
||||
|
||||
# Create a socket (SOCK_STREAM means a TCP socket)
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
||||
try:
|
||||
# Connect to server and send data
|
||||
sock.connect((HOST, PORT))
|
||||
sock.sendall(bytes(data + "\n"))
|
||||
|
||||
# Receive data from the server and shut down
|
||||
received = str(sock.recv(1024));
|
||||
finally:
|
||||
sock.close()
|
||||
|
||||
print("Sent: {}".format(data))
|
||||
print("Received: {}".format(received))
|
||||
@@ -1,60 +0,0 @@
|
||||
import socket
|
||||
import select
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.bind(('', 10000))
|
||||
sock.listen(5)
|
||||
|
||||
# lists of sockets to watch for input and output events
|
||||
ins = [sock]
|
||||
ous = []
|
||||
# mapping socket -> data to send on that socket when feasible
|
||||
data = {}
|
||||
# mapping socket -> (host, port) on which the client is running
|
||||
adrs = {}
|
||||
|
||||
try:
|
||||
while True:
|
||||
i, o, e = select.select(ins, ous, []) # no excepts nor timeout
|
||||
for x in i:
|
||||
if x is sock:
|
||||
# input event on sock means client trying to connect
|
||||
newSocket, address = sock.accept( )
|
||||
print "Connected from", address
|
||||
ins.append(newSocket)
|
||||
adrs[newSocket] = address
|
||||
else:
|
||||
# other input events mean data arrived, or disconnections
|
||||
newdata = x.recv(8192)
|
||||
if newdata:
|
||||
# data arrived, prepare and queue the response to it
|
||||
print "%d bytes from %s" % (len(newdata), adrs[x])
|
||||
data[x] = data.get(x, '') + newdata
|
||||
if x not in ous: ous.append(x)
|
||||
else:
|
||||
# a disconnect, give a message and clean up
|
||||
print "disconnected from", adrs[x]
|
||||
del adrs[x]
|
||||
try:
|
||||
ins.remove(x)
|
||||
ous.remove(x)
|
||||
except ValueError: pass
|
||||
x.close( )
|
||||
for x in o:
|
||||
# output events always mean we can send some data
|
||||
tosend = data.get(x)
|
||||
if tosend:
|
||||
nsent = x.send(tosend)
|
||||
print "%d bytes to %s" % (nsent, adrs[x])
|
||||
# remember data still to be sent, if any
|
||||
tosend = tosend[nsent:]
|
||||
if tosend:
|
||||
print "%d bytes remain for %s" % (len(tosend), adrs[x])
|
||||
data[x] = tosend
|
||||
else:
|
||||
try: del data[x]
|
||||
except KeyError: pass
|
||||
ous.remove(x)
|
||||
print "No data currently remain for", adrs[x]
|
||||
finally:
|
||||
sock.close( )
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
import socket
|
||||
import sys
|
||||
|
||||
# Create a TCP/IP socket
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
||||
# Connect the socket to the port where the server is listening
|
||||
server_address = ('localhost', 10000)
|
||||
print >>sys.stderr, 'connecting to %s port %s' % server_address
|
||||
sock.connect(server_address)
|
||||
|
||||
try:
|
||||
# Send data
|
||||
message = 'This is the message. It will be repeated.'
|
||||
print >>sys.stderr, 'sending "%s"' % message
|
||||
sock.sendall(message)
|
||||
|
||||
# Look for the response
|
||||
amount_received = 0
|
||||
amount_expected = len(message)
|
||||
|
||||
while amount_received < amount_expected:
|
||||
data = sock.recv(16)
|
||||
amount_received += len(data)
|
||||
print >>sys.stderr, 'received "%s"' % data
|
||||
|
||||
finally:
|
||||
print >>sys.stderr, 'closing socket'
|
||||
sock.close()
|
||||
@@ -1,36 +0,0 @@
|
||||
import socket
|
||||
import sys
|
||||
|
||||
# Create a TCP/IP socket
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
||||
# Bind the socket to the port
|
||||
server_address = ('localhost', 10000)
|
||||
print >>sys.stderr, 'starting up on %s port %s' % server_address
|
||||
sock.bind(server_address)
|
||||
|
||||
# Listen for incoming connections
|
||||
sock.listen(1)
|
||||
|
||||
while True:
|
||||
# Wait for a connection
|
||||
print >>sys.stderr, 'waiting for a connection'
|
||||
connection, client_address = sock.accept()
|
||||
|
||||
try:
|
||||
print >>sys.stderr, 'connection from', client_address
|
||||
|
||||
# Receive the data in small chunks and retransmit it
|
||||
while True:
|
||||
data = connection.recv(1024)
|
||||
print >>sys.stderr, 'received "%s"' % data
|
||||
if data:
|
||||
print >>sys.stderr, 'sending data back to the client'
|
||||
connection.sendall(data)
|
||||
else:
|
||||
print >>sys.stderr, 'no more data from', client_address
|
||||
break
|
||||
|
||||
finally:
|
||||
# Clean up the connection
|
||||
connection.close()
|
||||
@@ -1,38 +0,0 @@
|
||||
import SocketServer
|
||||
|
||||
class MyTCPHandler(SocketServer.BaseRequestHandler):
|
||||
"""
|
||||
The RequestHandler class for our server.
|
||||
|
||||
It is instantiated once per connection to the server, and must
|
||||
override the handle() method to implement communication to the
|
||||
client.
|
||||
"""
|
||||
|
||||
def handle(self):
|
||||
# self.request is the TCP socket connected to the client
|
||||
self.data = self.request.recv(1024).strip()
|
||||
print("{} wrote:".format(self.client_address[0]))
|
||||
print(self.data)
|
||||
# just send back the same data, but upper-cased
|
||||
self.request.sendall(self.data.upper())
|
||||
|
||||
# def handle(self):
|
||||
# self.rfile is a file-like object created by the handler;
|
||||
# we can now use e.g. readline() instead of raw recv() calls
|
||||
# self.data = self.rfile.readline().strip()
|
||||
# print("{} wrote:".format(self.client_address[0]))
|
||||
# print(self.data)
|
||||
# Likewise, self.wfile is a file-like object used to write back
|
||||
# to the client
|
||||
# self.wfile.write(self.data.upper())
|
||||
|
||||
if __name__ == "__main__":
|
||||
HOST, PORT = "localhost", 10000
|
||||
|
||||
# Create the server, binding to localhost on port 9999
|
||||
server = SocketServer.TCPServer((HOST, PORT), MyTCPHandler)
|
||||
|
||||
# Activate the server; this will keep running until you
|
||||
# interrupt the program with Ctrl-C
|
||||
server.serve_forever()
|
||||
@@ -14,7 +14,7 @@ class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
|
||||
pass
|
||||
|
||||
def client(ip, port, message):
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock = socket.mysocket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect((ip, port))
|
||||
try:
|
||||
sock.sendall(message)
|
||||
|
||||
@@ -1,21 +0,0 @@
|
||||
import SocketServer
|
||||
|
||||
class MyUDPHandler(SocketServer.BaseRequestHandler):
|
||||
"""
|
||||
This class works similar to the TCP handler class, except that
|
||||
self.request consists of a pair of data and client socket, and since
|
||||
there is no connection the client address must be given explicitly
|
||||
when sending data back via sendto().
|
||||
"""
|
||||
|
||||
def handle(self):
|
||||
data = self.request[0].strip()
|
||||
socket = self.request[1]
|
||||
print "{} wrote:".format(self.client_address[0])
|
||||
print data
|
||||
socket.sendto(data.upper(), self.client_address)
|
||||
|
||||
if __name__ == "__main__":
|
||||
HOST, PORT = "localhost", 9999
|
||||
server = SocketServer.UDPServer((HOST, PORT), MyUDPHandler)
|
||||
server.serve_forever()
|
||||
34
test.py
Normal file
34
test.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from mysocket import TcpClient, TcpServer
|
||||
import time
|
||||
import unittest
|
||||
|
||||
class TestTcpCommunication(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.server = TcpServer('', 10000)
|
||||
self.client = TcpClient('localhost', 10000)
|
||||
|
||||
self.server.start()
|
||||
self.client.start()
|
||||
pass
|
||||
|
||||
def tearDown(self):
|
||||
self.client.stop()
|
||||
self.server.stop()
|
||||
|
||||
def test_tcp_communication(self):
|
||||
for i in range(1, 50):
|
||||
self.server.send('server #%d' % i)
|
||||
time.sleep(1)
|
||||
|
||||
for i in range(1, 50):
|
||||
self.client.send('client #%d' % i)
|
||||
time.sleep(1)
|
||||
|
||||
if __name__ == '__main__':
|
||||
testList = [TestTcpCommunication]
|
||||
suite = unittest.TestLoader().loadTestsFromTestCase(unittest.TestCase)
|
||||
for testCase in testList:
|
||||
suite.addTest(unittest.makeSuite(testCase))
|
||||
result = unittest.TestResult()
|
||||
suite.run(result)
|
||||
print result
|
||||
Reference in New Issue
Block a user