1#!/usr/bin/env python
2
3# WS client example
4
5import asyncio
6import websockets
7
8async def hello():
9 uri = "ws://localhost:8765"
10 async with websockets.connect(uri) as websocket:
11 name = input("What's your name? ")
12
13 await websocket.send(name)
14 print(f"> {name}")
15
16 greeting = await websocket.recv()
17 print(f"< {greeting}")
18
19asyncio.get_event_loop().run_until_complete(hello())
20
1#!/usr/bin/env python
2
3# WS server example
4
5import asyncio
6import websockets
7
8async def hello(websocket, path):
9 name = await websocket.recv()
10 print(f"< {name}")
11
12 greeting = f"Hello {name}!"
13
14 await websocket.send(greeting)
15 print(f"> {greeting}")
16
17start_server = websockets.serve(hello, "localhost", 8765)
18
19asyncio.get_event_loop().run_until_complete(start_server)
20asyncio.get_event_loop().run_forever()
1#!/usr/bin/env python
2
3# WS server that sends messages at random intervals
4
5import asyncio
6import datetime
7import random
8import websockets
9
10async def time(websocket, path):
11 while True:
12 now = datetime.datetime.utcnow().isoformat() + "Z"
13 await websocket.send(now)
14 await asyncio.sleep(random.random() * 3)
15
16start_server = websockets.serve(time, "127.0.0.1", 5678)
17
18asyncio.get_event_loop().run_until_complete(start_server)
19asyncio.get_event_loop().run_forever()
20